Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ To install SMQ, clone the repository and build the application using Go:
```bash
git clone https://github.com/Suhaibinator/SuhaibMessageQueue.git
cd SuhaibMessageQueue
go build
go build ./cmd/smq
```

### Running SMQ
Expand Down Expand Up @@ -70,6 +70,7 @@ docker run suhaibinator/smq:latest-arm64
- `--server-cert`: Path to the server's certificate file for mTLS.
- `--server-key`: Path to the server's private key file for mTLS.
- `--server-ca-cert`: Path to the CA certificate file for the server to verify clients.
- `--allow-remote-writes`: Allow remote gRPC clients to modify data. Default: `true`.

If command-line flags are provided, they take the highest priority and override any other settings.

Expand All @@ -85,6 +86,7 @@ If command-line flags are provided, they take the highest priority and override
- `SERVER_CERT_FILE`: Path to server certificate file.
- `SERVER_KEY_FILE`: Path to server key file.
- `SERVER_CA_CERT_FILE`: Path to CA certificate for server to verify clients.
- `ALLOW_REMOTE_WRITES`: Allow remote gRPC clients to modify data (`true` or `false`).

Environment variables take priority over default values but are overridden by command-line flags if they are set.

Expand Down
8 changes: 7 additions & 1 deletion main.go → cmd/smq/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"os"

"github.com/Suhaibinator/SuhaibMessageQueue/config"
"github.com/Suhaibinator/SuhaibMessageQueue/server"
"github.com/Suhaibinator/SuhaibMessageQueue/internal/server"
)

func init() {
Expand All @@ -27,6 +27,7 @@ func init() {
fmt.Fprintf(flag.CommandLine.Output(), " %s: Path to server certificate file for mTLS\n", config.ENV_SERVER_CERT_FILE)
fmt.Fprintf(flag.CommandLine.Output(), " %s: Path to server key file for mTLS\n", config.ENV_SERVER_KEY_FILE)
fmt.Fprintf(flag.CommandLine.Output(), " %s: Path to CA certificate file for mTLS (server to verify client)\n", config.ENV_SERVER_CA_CERT_FILE)
fmt.Fprintf(flag.CommandLine.Output(), " %s: Allow remote gRPC clients to perform write operations (default true)\n", config.ENV_ALLOW_REMOTE_WRITES)
}

// Check environment variables
Expand Down Expand Up @@ -60,6 +61,9 @@ func init() {
if serverCACertFile, exists := os.LookupEnv(config.ENV_SERVER_CA_CERT_FILE); exists {
config.ServerCACertFile = serverCACertFile
}
if allowWrites, exists := os.LookupEnv(config.ENV_ALLOW_REMOTE_WRITES); exists {
config.AllowRemoteWrites = allowWrites != "false"
}

// Check program arguments
// Note: Command-line flag values will override environment variable settings for the corresponding configuration options.
Expand All @@ -75,12 +79,14 @@ func init() {
flag.StringVar(&config.ServerKeyFile, "server-key", config.ServerKeyFile, "path to server key file for mTLS")
flag.StringVar(&config.ServerCACertFile, "server-ca-cert", config.ServerCACertFile, "path to CA certificate file for mTLS (server to verify client)")
serverEnableMTLSFlag := flag.Bool("server-enable-mtls", config.ServerEnableMTLS, "enable server-side mTLS authentication")
allowWritesFlag := flag.Bool("allow-remote-writes", config.AllowRemoteWrites, "allow remote gRPC clients to perform write operations")

flag.Parse()

// Update EnableMTLS values from flags
config.EnableMTLS = *clientEnableMTLSFlag // For client
config.ServerEnableMTLS = *serverEnableMTLSFlag // For server
config.AllowRemoteWrites = *allowWritesFlag
}

func main() {
Expand Down
6 changes: 6 additions & 0 deletions config/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ const (
ENV_SERVER_CERT_FILE = "SERVER_CERT_FILE"
ENV_SERVER_KEY_FILE = "SERVER_KEY_FILE"
ENV_SERVER_CA_CERT_FILE = "SERVER_CA_CERT_FILE"

// Control whether remote gRPC clients can modify data
ENV_ALLOW_REMOTE_WRITES = "ALLOW_REMOTE_WRITES"
)

var (
Expand All @@ -31,4 +34,7 @@ var (
ServerCertFile = "" // Path to server's certificate file
ServerKeyFile = "" // Path to server's private key file
ServerCACertFile = "" // Path to CA certificate file for verifying client certificates

// AllowRemoteWrites controls whether external gRPC callers can perform write operations
AllowRemoteWrites = true
)
4 changes: 4 additions & 0 deletions errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ const (
ErrTopicIsEmptyMessage = "topic is empty"
// ErrDeletingTopic is returned when a topic cannot be deleted
ErrDeletingTopicMessage = "error deleting topic"
// ErrRemoteWritesDisabled is returned when remote writes are disabled
ErrRemoteWritesDisabledMessage = "remote writes disabled"
)

var (
Expand All @@ -30,4 +32,6 @@ var (
ErrTopicIsEmpty = errors.New(ErrTopicIsEmptyMessage)
// ErrDeletingTopic is returned when a topic cannot be deleted
ErrDeletingTopic = errors.New(ErrDeletingTopicMessage)
// ErrRemoteWritesDisabled is returned when remote write operations are not allowed
ErrRemoteWritesDisabled = errors.New(ErrRemoteWritesDisabledMessage)
)
File renamed without changes.
File renamed without changes.
43 changes: 32 additions & 11 deletions server/server.go → internal/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,29 +9,35 @@ import (
"time"

"github.com/Suhaibinator/SuhaibMessageQueue/config"
smqerrors "github.com/Suhaibinator/SuhaibMessageQueue/errors"
"github.com/Suhaibinator/SuhaibMessageQueue/internal/server/database"
pb "github.com/Suhaibinator/SuhaibMessageQueue/proto"
"github.com/Suhaibinator/SuhaibMessageQueue/server/database"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)

// ServerOptions allows for programmatic configuration of the server.
type ServerOptions struct {
EnableMTLS bool
TLSConfig *tls.Config
MaxRecvMsgSize int
MaxSendMsgSize int
EnableMTLS bool
TLSConfig *tls.Config
MaxRecvMsgSize int
MaxSendMsgSize int
AllowRemoteWrites bool
}

type Server struct {
pb.UnimplementedSuhaibMessageQueueServer
driver database.DBDriverInterface
grpcServer *grpc.Server

port string
port string
allowRemoteWrites bool
}

func (s *Server) Produce(ctx context.Context, pr *pb.ProduceRequest) (*pb.ProduceResponse, error) {
if !s.allowRemoteWrites {
return nil, smqerrors.ErrRemoteWritesDisabled
}
Comment on lines 37 to +40
Copy link

Copilot AI May 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] Consider abstracting the remote write permission check into a helper function to avoid repetitive code across multiple endpoints.

Suggested change
func (s *Server) Produce(ctx context.Context, pr *pb.ProduceRequest) (*pb.ProduceResponse, error) {
if !s.allowRemoteWrites {
return nil, smqerrors.ErrRemoteWritesDisabled
}
func (s *Server) checkRemoteWritePermission() error {
if !s.allowRemoteWrites {
return smqerrors.ErrRemoteWritesDisabled
}
return nil
}
func (s *Server) Produce(ctx context.Context, pr *pb.ProduceRequest) (*pb.ProduceResponse, error) {
if err := s.checkRemoteWritePermission(); err != nil {
return nil, err
}

Copilot uses AI. Check for mistakes.
err := s.driver.AddMessageToTopic(pr.Topic, pr.Message)
if err != nil {
return nil, err
Expand All @@ -40,6 +46,9 @@ func (s *Server) Produce(ctx context.Context, pr *pb.ProduceRequest) (*pb.Produc
}

func (s *Server) StreamProduce(sp pb.SuhaibMessageQueue_StreamProduceServer) error {
if !s.allowRemoteWrites {
return smqerrors.ErrRemoteWritesDisabled
}
for {
// Receive a message from the client
message, err := sp.Recv()
Expand Down Expand Up @@ -120,6 +129,9 @@ func (s *Server) streamMessages(topic string, cs pb.SuhaibMessageQueue_StreamCon
}

func (s *Server) CreateTopic(ctx context.Context, tr *pb.CreateTopicRequest) (*pb.CreateTopicResponse, error) {
if !s.allowRemoteWrites {
return nil, smqerrors.ErrRemoteWritesDisabled
}
err := s.driver.CreateTopic(tr.Topic)
if err != nil {
return nil, err
Expand Down Expand Up @@ -220,13 +232,19 @@ func NewServer(port, dbPath string, opts *ServerOptions) *Server {
}

grpcServer := grpc.NewServer(serverOpts...)
pb.RegisterSuhaibMessageQueueServer(grpcServer, &Server{driver: driver})
allowWrites := config.AllowRemoteWrites
if opts != nil {
allowWrites = opts.AllowRemoteWrites
}

return &Server{
driver: driver,
grpcServer: grpcServer,
port: port,
srv := &Server{
driver: driver,
grpcServer: grpcServer,
port: port,
allowRemoteWrites: allowWrites,
}
pb.RegisterSuhaibMessageQueueServer(grpcServer, srv)
return srv
}

func (s *Server) Start() {
Expand All @@ -241,6 +259,9 @@ func (s *Server) Start() {
}

func (s *Server) DeleteUntilOffset(ctx context.Context, dr *pb.DeleteUntilOffsetRequest) (*pb.DeleteUntilOffsetResponse, error) {
if !s.allowRemoteWrites {
return nil, smqerrors.ErrRemoteWritesDisabled
}
err := s.driver.DeleteMessagesUntilOffset(dr.Topic, dr.Offset)
if err != nil {
return nil, err
Expand Down
9 changes: 5 additions & 4 deletions server/server_test.go → internal/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import (

"github.com/Suhaibinator/SuhaibMessageQueue/config"
"github.com/Suhaibinator/SuhaibMessageQueue/errors"
"github.com/Suhaibinator/SuhaibMessageQueue/internal/server/database"
pb "github.com/Suhaibinator/SuhaibMessageQueue/proto"
"github.com/Suhaibinator/SuhaibMessageQueue/server/database"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/test/bufconn"
Expand Down Expand Up @@ -238,9 +238,10 @@ func setupTest(t *testing.T) (*Server, pb.SuhaibMessageQueueClient, func()) {

// Create a server with the mock driver
s := &Server{
driver: mockDriver,
grpcServer: grpc.NewServer(),
port: "8097",
driver: mockDriver,
grpcServer: grpc.NewServer(),
port: "8097",
allowRemoteWrites: true,
}

// Register the server
Expand Down
Loading