Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ ENV CGO_ENABLED=1
ENV GOOS=linux

# Build the Go application
RUN go build -o SuhaibMessageQueue
RUN go build -o smq ./cmd/smq

# Stage 2: Create the final image
FROM alpine:latest
Expand All @@ -28,10 +28,10 @@ WORKDIR /app
VOLUME /db

# Copy the binary from the builder stage to the working directory
COPY --from=builder /src/SuhaibMessageQueue .
COPY --from=builder /src/smq .

# Make the binary executable
RUN chmod +x SuhaibMessageQueue
RUN chmod +x smq

# Specify the command to run when the container starts
CMD ["./SuhaibMessageQueue"]
CMD ["./smq"]
6 changes: 5 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ To install SMQ, clone the repository and build the application using Go:
```bash
git clone https://github.com/Suhaibinator/SuhaibMessageQueue.git
cd SuhaibMessageQueue
go build
go build ./cmd/smq
```

### Running SMQ
Expand Down Expand Up @@ -70,6 +70,7 @@ docker run suhaibinator/smq:latest-arm64
- `--server-cert`: Path to the server's certificate file for mTLS.
- `--server-key`: Path to the server's private key file for mTLS.
- `--server-ca-cert`: Path to the CA certificate file for the server to verify clients.
- `--allow-remote-writes`: Allow remote gRPC clients to modify data. Default: `true`.

If command-line flags are provided, they take the highest priority and override any other settings.

Expand All @@ -85,6 +86,9 @@ If command-line flags are provided, they take the highest priority and override
- `SERVER_CERT_FILE`: Path to server certificate file.
- `SERVER_KEY_FILE`: Path to server key file.
- `SERVER_CA_CERT_FILE`: Path to CA certificate for server to verify clients.
- `ALLOW_REMOTE_WRITES`: Allow remote gRPC clients to modify data (`true` or `false`).

To embed SMQ without allowing external write operations, set `ALLOW_REMOTE_WRITES=false` or start the server with `--allow-remote-writes=false`.

Environment variables take priority over default values but are overridden by command-line flags if they are set.

Expand Down
8 changes: 7 additions & 1 deletion main.go → cmd/smq/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"os"

"github.com/Suhaibinator/SuhaibMessageQueue/config"
"github.com/Suhaibinator/SuhaibMessageQueue/server"
"github.com/Suhaibinator/SuhaibMessageQueue/internal/server"
)

func init() {
Expand All @@ -27,6 +27,7 @@ func init() {
fmt.Fprintf(flag.CommandLine.Output(), " %s: Path to server certificate file for mTLS\n", config.ENV_SERVER_CERT_FILE)
fmt.Fprintf(flag.CommandLine.Output(), " %s: Path to server key file for mTLS\n", config.ENV_SERVER_KEY_FILE)
fmt.Fprintf(flag.CommandLine.Output(), " %s: Path to CA certificate file for mTLS (server to verify client)\n", config.ENV_SERVER_CA_CERT_FILE)
fmt.Fprintf(flag.CommandLine.Output(), " %s: Allow remote gRPC clients to perform write operations (default true)\n", config.ENV_ALLOW_REMOTE_WRITES)
}

// Check environment variables
Expand Down Expand Up @@ -60,6 +61,9 @@ func init() {
if serverCACertFile, exists := os.LookupEnv(config.ENV_SERVER_CA_CERT_FILE); exists {
config.ServerCACertFile = serverCACertFile
}
if allowWrites, exists := os.LookupEnv(config.ENV_ALLOW_REMOTE_WRITES); exists {
config.AllowRemoteWrites = allowWrites != "false"
}

// Check program arguments
// Note: Command-line flag values will override environment variable settings for the corresponding configuration options.
Expand All @@ -75,12 +79,14 @@ func init() {
flag.StringVar(&config.ServerKeyFile, "server-key", config.ServerKeyFile, "path to server key file for mTLS")
flag.StringVar(&config.ServerCACertFile, "server-ca-cert", config.ServerCACertFile, "path to CA certificate file for mTLS (server to verify client)")
serverEnableMTLSFlag := flag.Bool("server-enable-mtls", config.ServerEnableMTLS, "enable server-side mTLS authentication")
allowWritesFlag := flag.Bool("allow-remote-writes", config.AllowRemoteWrites, "allow remote gRPC clients to perform write operations")

flag.Parse()

// Update EnableMTLS values from flags
config.EnableMTLS = *clientEnableMTLSFlag // For client
config.ServerEnableMTLS = *serverEnableMTLSFlag // For server
config.AllowRemoteWrites = *allowWritesFlag
}

func main() {
Expand Down
6 changes: 6 additions & 0 deletions config/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ const (
ENV_SERVER_CERT_FILE = "SERVER_CERT_FILE"
ENV_SERVER_KEY_FILE = "SERVER_KEY_FILE"
ENV_SERVER_CA_CERT_FILE = "SERVER_CA_CERT_FILE"

// Control whether remote gRPC clients can modify data
ENV_ALLOW_REMOTE_WRITES = "ALLOW_REMOTE_WRITES"
)

var (
Expand All @@ -31,4 +34,7 @@ var (
ServerCertFile = "" // Path to server's certificate file
ServerKeyFile = "" // Path to server's private key file
ServerCACertFile = "" // Path to CA certificate file for verifying client certificates

// AllowRemoteWrites controls whether external gRPC callers can perform write operations
AllowRemoteWrites = true
)
4 changes: 4 additions & 0 deletions errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ const (
ErrTopicIsEmptyMessage = "topic is empty"
// ErrDeletingTopic is returned when a topic cannot be deleted
ErrDeletingTopicMessage = "error deleting topic"
// ErrRemoteWritesDisabled is returned when remote writes are disabled
ErrRemoteWritesDisabledMessage = "remote writes disabled"
)

var (
Expand All @@ -30,4 +32,6 @@ var (
ErrTopicIsEmpty = errors.New(ErrTopicIsEmptyMessage)
// ErrDeletingTopic is returned when a topic cannot be deleted
ErrDeletingTopic = errors.New(ErrDeletingTopicMessage)
// ErrRemoteWritesDisabled is returned when remote write operations are not allowed
ErrRemoteWritesDisabled = errors.New(ErrRemoteWritesDisabledMessage)
)
File renamed without changes.
File renamed without changes.
47 changes: 36 additions & 11 deletions server/server.go → internal/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,29 +9,35 @@ import (
"time"

"github.com/Suhaibinator/SuhaibMessageQueue/config"
smqerrors "github.com/Suhaibinator/SuhaibMessageQueue/errors"
"github.com/Suhaibinator/SuhaibMessageQueue/internal/server/database"
pb "github.com/Suhaibinator/SuhaibMessageQueue/proto"
"github.com/Suhaibinator/SuhaibMessageQueue/server/database"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)

// ServerOptions allows for programmatic configuration of the server.
type ServerOptions struct {
EnableMTLS bool
TLSConfig *tls.Config
MaxRecvMsgSize int
MaxSendMsgSize int
EnableMTLS bool
TLSConfig *tls.Config
MaxRecvMsgSize int
MaxSendMsgSize int
AllowRemoteWrites bool
}

type Server struct {
pb.UnimplementedSuhaibMessageQueueServer
driver database.DBDriverInterface
grpcServer *grpc.Server

port string
port string
allowRemoteWrites bool
}

func (s *Server) Produce(ctx context.Context, pr *pb.ProduceRequest) (*pb.ProduceResponse, error) {
if !s.allowRemoteWrites {
return nil, smqerrors.ErrRemoteWritesDisabled
}
err := s.driver.AddMessageToTopic(pr.Topic, pr.Message)
if err != nil {
return nil, err
Expand All @@ -40,6 +46,9 @@ func (s *Server) Produce(ctx context.Context, pr *pb.ProduceRequest) (*pb.Produc
}

func (s *Server) StreamProduce(sp pb.SuhaibMessageQueue_StreamProduceServer) error {
if !s.allowRemoteWrites {
return smqerrors.ErrRemoteWritesDisabled
}
for {
// Receive a message from the client
message, err := sp.Recv()
Expand Down Expand Up @@ -120,6 +129,9 @@ func (s *Server) streamMessages(topic string, cs pb.SuhaibMessageQueue_StreamCon
}

func (s *Server) CreateTopic(ctx context.Context, tr *pb.CreateTopicRequest) (*pb.CreateTopicResponse, error) {
if !s.allowRemoteWrites {
return nil, smqerrors.ErrRemoteWritesDisabled
}
err := s.driver.CreateTopic(tr.Topic)
if err != nil {
return nil, err
Expand Down Expand Up @@ -220,13 +232,23 @@ func NewServer(port, dbPath string, opts *ServerOptions) *Server {
}

grpcServer := grpc.NewServer(serverOpts...)
pb.RegisterSuhaibMessageQueueServer(grpcServer, &Server{driver: driver})
allowWrites := config.AllowRemoteWrites
if opts != nil {
if opts.AllowRemoteWrites {
allowWrites = true
} else {
allowWrites = false
}
}

return &Server{
driver: driver,
grpcServer: grpcServer,
port: port,
srv := &Server{
driver: driver,
grpcServer: grpcServer,
port: port,
allowRemoteWrites: allowWrites,
}
pb.RegisterSuhaibMessageQueueServer(grpcServer, srv)
return srv
}

func (s *Server) Start() {
Expand All @@ -241,6 +263,9 @@ func (s *Server) Start() {
}

func (s *Server) DeleteUntilOffset(ctx context.Context, dr *pb.DeleteUntilOffsetRequest) (*pb.DeleteUntilOffsetResponse, error) {
if !s.allowRemoteWrites {
return nil, smqerrors.ErrRemoteWritesDisabled
}
err := s.driver.DeleteMessagesUntilOffset(dr.Topic, dr.Offset)
if err != nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion server/server_test.go → internal/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import (

"github.com/Suhaibinator/SuhaibMessageQueue/config"
"github.com/Suhaibinator/SuhaibMessageQueue/errors"
"github.com/Suhaibinator/SuhaibMessageQueue/internal/server/database"
pb "github.com/Suhaibinator/SuhaibMessageQueue/proto"
"github.com/Suhaibinator/SuhaibMessageQueue/server/database"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/test/bufconn"
Expand Down
Loading