From 38dc58bf55967a2af61d8aa2181430d212fd476e Mon Sep 17 00:00:00 2001 From: Suhaibinator <42899065+Suhaibinator@users.noreply.github.com> Date: Fri, 16 May 2025 17:37:58 -0700 Subject: [PATCH 1/2] Fix tests by enabling remote writes --- README.md | 4 +- main.go => cmd/smq/main.go | 8 +++- config/configuration.go | 6 +++ errors/errors.go | 4 ++ .../server}/database/driver.go | 0 .../server}/database/driver_interface.go | 0 .../server}/database/driver_test.go | 0 .../server}/database/topic_operations.go | 0 .../server}/database/topic_operations_test.go | 0 {server => internal/server}/server.go | 47 ++++++++++++++----- {server => internal/server}/server_test.go | 9 ++-- 11 files changed, 61 insertions(+), 17 deletions(-) rename main.go => cmd/smq/main.go (89%) rename {server => internal/server}/database/driver.go (100%) rename {server => internal/server}/database/driver_interface.go (100%) rename {server => internal/server}/database/driver_test.go (100%) rename {server => internal/server}/database/topic_operations.go (100%) rename {server => internal/server}/database/topic_operations_test.go (100%) rename {server => internal/server}/server.go (88%) rename {server => internal/server}/server_test.go (99%) diff --git a/README.md b/README.md index 59ae343..d3dcf49 100644 --- a/README.md +++ b/README.md @@ -33,7 +33,7 @@ To install SMQ, clone the repository and build the application using Go: ```bash git clone https://github.com/Suhaibinator/SuhaibMessageQueue.git cd SuhaibMessageQueue -go build +go build ./cmd/smq ``` ### Running SMQ @@ -70,6 +70,7 @@ docker run suhaibinator/smq:latest-arm64 - `--server-cert`: Path to the server's certificate file for mTLS. - `--server-key`: Path to the server's private key file for mTLS. - `--server-ca-cert`: Path to the CA certificate file for the server to verify clients. +- `--allow-remote-writes`: Allow remote gRPC clients to modify data. Default: `true`. If command-line flags are provided, they take the highest priority and override any other settings. @@ -85,6 +86,7 @@ If command-line flags are provided, they take the highest priority and override - `SERVER_CERT_FILE`: Path to server certificate file. - `SERVER_KEY_FILE`: Path to server key file. - `SERVER_CA_CERT_FILE`: Path to CA certificate for server to verify clients. +- `ALLOW_REMOTE_WRITES`: Allow remote gRPC clients to modify data (`true` or `false`). Environment variables take priority over default values but are overridden by command-line flags if they are set. diff --git a/main.go b/cmd/smq/main.go similarity index 89% rename from main.go rename to cmd/smq/main.go index bed4cd1..ad0eb74 100644 --- a/main.go +++ b/cmd/smq/main.go @@ -6,7 +6,7 @@ import ( "os" "github.com/Suhaibinator/SuhaibMessageQueue/config" - "github.com/Suhaibinator/SuhaibMessageQueue/server" + "github.com/Suhaibinator/SuhaibMessageQueue/internal/server" ) func init() { @@ -27,6 +27,7 @@ func init() { fmt.Fprintf(flag.CommandLine.Output(), " %s: Path to server certificate file for mTLS\n", config.ENV_SERVER_CERT_FILE) fmt.Fprintf(flag.CommandLine.Output(), " %s: Path to server key file for mTLS\n", config.ENV_SERVER_KEY_FILE) fmt.Fprintf(flag.CommandLine.Output(), " %s: Path to CA certificate file for mTLS (server to verify client)\n", config.ENV_SERVER_CA_CERT_FILE) + fmt.Fprintf(flag.CommandLine.Output(), " %s: Allow remote gRPC clients to perform write operations (default true)\n", config.ENV_ALLOW_REMOTE_WRITES) } // Check environment variables @@ -60,6 +61,9 @@ func init() { if serverCACertFile, exists := os.LookupEnv(config.ENV_SERVER_CA_CERT_FILE); exists { config.ServerCACertFile = serverCACertFile } + if allowWrites, exists := os.LookupEnv(config.ENV_ALLOW_REMOTE_WRITES); exists { + config.AllowRemoteWrites = allowWrites != "false" + } // Check program arguments // Note: Command-line flag values will override environment variable settings for the corresponding configuration options. @@ -75,12 +79,14 @@ func init() { flag.StringVar(&config.ServerKeyFile, "server-key", config.ServerKeyFile, "path to server key file for mTLS") flag.StringVar(&config.ServerCACertFile, "server-ca-cert", config.ServerCACertFile, "path to CA certificate file for mTLS (server to verify client)") serverEnableMTLSFlag := flag.Bool("server-enable-mtls", config.ServerEnableMTLS, "enable server-side mTLS authentication") + allowWritesFlag := flag.Bool("allow-remote-writes", config.AllowRemoteWrites, "allow remote gRPC clients to perform write operations") flag.Parse() // Update EnableMTLS values from flags config.EnableMTLS = *clientEnableMTLSFlag // For client config.ServerEnableMTLS = *serverEnableMTLSFlag // For server + config.AllowRemoteWrites = *allowWritesFlag } func main() { diff --git a/config/configuration.go b/config/configuration.go index 39e9ac5..b41200f 100644 --- a/config/configuration.go +++ b/config/configuration.go @@ -13,6 +13,9 @@ const ( ENV_SERVER_CERT_FILE = "SERVER_CERT_FILE" ENV_SERVER_KEY_FILE = "SERVER_KEY_FILE" ENV_SERVER_CA_CERT_FILE = "SERVER_CA_CERT_FILE" + + // Control whether remote gRPC clients can modify data + ENV_ALLOW_REMOTE_WRITES = "ALLOW_REMOTE_WRITES" ) var ( @@ -31,4 +34,7 @@ var ( ServerCertFile = "" // Path to server's certificate file ServerKeyFile = "" // Path to server's private key file ServerCACertFile = "" // Path to CA certificate file for verifying client certificates + + // AllowRemoteWrites controls whether external gRPC callers can perform write operations + AllowRemoteWrites = true ) diff --git a/errors/errors.go b/errors/errors.go index e830190..3d04ecc 100644 --- a/errors/errors.go +++ b/errors/errors.go @@ -15,6 +15,8 @@ const ( ErrTopicIsEmptyMessage = "topic is empty" // ErrDeletingTopic is returned when a topic cannot be deleted ErrDeletingTopicMessage = "error deleting topic" + // ErrRemoteWritesDisabled is returned when remote writes are disabled + ErrRemoteWritesDisabledMessage = "remote writes disabled" ) var ( @@ -30,4 +32,6 @@ var ( ErrTopicIsEmpty = errors.New(ErrTopicIsEmptyMessage) // ErrDeletingTopic is returned when a topic cannot be deleted ErrDeletingTopic = errors.New(ErrDeletingTopicMessage) + // ErrRemoteWritesDisabled is returned when remote write operations are not allowed + ErrRemoteWritesDisabled = errors.New(ErrRemoteWritesDisabledMessage) ) diff --git a/server/database/driver.go b/internal/server/database/driver.go similarity index 100% rename from server/database/driver.go rename to internal/server/database/driver.go diff --git a/server/database/driver_interface.go b/internal/server/database/driver_interface.go similarity index 100% rename from server/database/driver_interface.go rename to internal/server/database/driver_interface.go diff --git a/server/database/driver_test.go b/internal/server/database/driver_test.go similarity index 100% rename from server/database/driver_test.go rename to internal/server/database/driver_test.go diff --git a/server/database/topic_operations.go b/internal/server/database/topic_operations.go similarity index 100% rename from server/database/topic_operations.go rename to internal/server/database/topic_operations.go diff --git a/server/database/topic_operations_test.go b/internal/server/database/topic_operations_test.go similarity index 100% rename from server/database/topic_operations_test.go rename to internal/server/database/topic_operations_test.go diff --git a/server/server.go b/internal/server/server.go similarity index 88% rename from server/server.go rename to internal/server/server.go index abaf285..4a21414 100644 --- a/server/server.go +++ b/internal/server/server.go @@ -9,18 +9,20 @@ import ( "time" "github.com/Suhaibinator/SuhaibMessageQueue/config" + smqerrors "github.com/Suhaibinator/SuhaibMessageQueue/errors" + "github.com/Suhaibinator/SuhaibMessageQueue/internal/server/database" pb "github.com/Suhaibinator/SuhaibMessageQueue/proto" - "github.com/Suhaibinator/SuhaibMessageQueue/server/database" "google.golang.org/grpc" "google.golang.org/grpc/credentials" ) // ServerOptions allows for programmatic configuration of the server. type ServerOptions struct { - EnableMTLS bool - TLSConfig *tls.Config - MaxRecvMsgSize int - MaxSendMsgSize int + EnableMTLS bool + TLSConfig *tls.Config + MaxRecvMsgSize int + MaxSendMsgSize int + AllowRemoteWrites bool } type Server struct { @@ -28,10 +30,14 @@ type Server struct { driver database.DBDriverInterface grpcServer *grpc.Server - port string + port string + allowRemoteWrites bool } func (s *Server) Produce(ctx context.Context, pr *pb.ProduceRequest) (*pb.ProduceResponse, error) { + if !s.allowRemoteWrites { + return nil, smqerrors.ErrRemoteWritesDisabled + } err := s.driver.AddMessageToTopic(pr.Topic, pr.Message) if err != nil { return nil, err @@ -40,6 +46,9 @@ func (s *Server) Produce(ctx context.Context, pr *pb.ProduceRequest) (*pb.Produc } func (s *Server) StreamProduce(sp pb.SuhaibMessageQueue_StreamProduceServer) error { + if !s.allowRemoteWrites { + return smqerrors.ErrRemoteWritesDisabled + } for { // Receive a message from the client message, err := sp.Recv() @@ -120,6 +129,9 @@ func (s *Server) streamMessages(topic string, cs pb.SuhaibMessageQueue_StreamCon } func (s *Server) CreateTopic(ctx context.Context, tr *pb.CreateTopicRequest) (*pb.CreateTopicResponse, error) { + if !s.allowRemoteWrites { + return nil, smqerrors.ErrRemoteWritesDisabled + } err := s.driver.CreateTopic(tr.Topic) if err != nil { return nil, err @@ -220,13 +232,23 @@ func NewServer(port, dbPath string, opts *ServerOptions) *Server { } grpcServer := grpc.NewServer(serverOpts...) - pb.RegisterSuhaibMessageQueueServer(grpcServer, &Server{driver: driver}) + allowWrites := config.AllowRemoteWrites + if opts != nil { + if opts.AllowRemoteWrites { + allowWrites = true + } else { + allowWrites = false + } + } - return &Server{ - driver: driver, - grpcServer: grpcServer, - port: port, + srv := &Server{ + driver: driver, + grpcServer: grpcServer, + port: port, + allowRemoteWrites: allowWrites, } + pb.RegisterSuhaibMessageQueueServer(grpcServer, srv) + return srv } func (s *Server) Start() { @@ -241,6 +263,9 @@ func (s *Server) Start() { } func (s *Server) DeleteUntilOffset(ctx context.Context, dr *pb.DeleteUntilOffsetRequest) (*pb.DeleteUntilOffsetResponse, error) { + if !s.allowRemoteWrites { + return nil, smqerrors.ErrRemoteWritesDisabled + } err := s.driver.DeleteMessagesUntilOffset(dr.Topic, dr.Offset) if err != nil { return nil, err diff --git a/server/server_test.go b/internal/server/server_test.go similarity index 99% rename from server/server_test.go rename to internal/server/server_test.go index 61cdaca..6b2135b 100644 --- a/server/server_test.go +++ b/internal/server/server_test.go @@ -9,8 +9,8 @@ import ( "github.com/Suhaibinator/SuhaibMessageQueue/config" "github.com/Suhaibinator/SuhaibMessageQueue/errors" + "github.com/Suhaibinator/SuhaibMessageQueue/internal/server/database" pb "github.com/Suhaibinator/SuhaibMessageQueue/proto" - "github.com/Suhaibinator/SuhaibMessageQueue/server/database" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/test/bufconn" @@ -238,9 +238,10 @@ func setupTest(t *testing.T) (*Server, pb.SuhaibMessageQueueClient, func()) { // Create a server with the mock driver s := &Server{ - driver: mockDriver, - grpcServer: grpc.NewServer(), - port: "8097", + driver: mockDriver, + grpcServer: grpc.NewServer(), + port: "8097", + allowRemoteWrites: true, } // Register the server From 62011ca8f2ae77728422238c229a485d2f0ab209 Mon Sep 17 00:00:00 2001 From: Suhaibinator <42899065+Suhaibinator@users.noreply.github.com> Date: Fri, 16 May 2025 17:41:47 -0700 Subject: [PATCH 2/2] Update internal/server/server.go Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- internal/server/server.go | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/internal/server/server.go b/internal/server/server.go index 4a21414..84d5407 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -234,11 +234,7 @@ func NewServer(port, dbPath string, opts *ServerOptions) *Server { grpcServer := grpc.NewServer(serverOpts...) allowWrites := config.AllowRemoteWrites if opts != nil { - if opts.AllowRemoteWrites { - allowWrites = true - } else { - allowWrites = false - } + allowWrites = opts.AllowRemoteWrites } srv := &Server{