A high-performance, fault-tolerant message broker built in Go that serves as the single source of truth for microservices, real-time systems, and event-driven architectures. Obelisk combines the reliability of Kafka with the simplicity of Redis, delivering enterprise-grade messaging with minimal operational overhead.
- Topic-based Architecture: Organize messages into logical topics with independent storage and indexing
- Binary Protocol: Efficient binary serialization (~38 bytes vs 90 for JSON) for minimal network overhead
- Dual Interface: TCP for high-performance message ingestion, HTTP for monitoring and administration
- At-least-once Delivery: Guaranteed message delivery with crash recovery support
- Batched Disk I/O: Smart batching system (size + time triggers) minimizes disk operations
- Memory-mapped Indexes: Fast message lookup using offset-to-position mapping
- Ring Buffer Caching: In-memory buffers for recent messages enable sub-millisecond reads
- Thread-safe Operations: Concurrent producers and consumers with proper mutex protection
- File Pooling: Efficient file descriptor management with automatic cleanup
- Zero Configuration: Works out of the box with sensible defaults
- Graceful Shutdown: Clean resource cleanup with proper signal handling
- Topic Auto-creation: Topics created automatically on first message
- File-based Storage: Simple, debuggable storage format with no external dependencies
- Comprehensive Health Monitoring: Kubernetes-ready health checks with component-level status
- Prometheus Metrics: Built-in monitoring and observability
- Corruption Recovery: Intelligent data corruption detection and recovery
- Retry Mechanisms: Configurable retry policies for transient failures
- Error Categorization: Sophisticated error handling with proper error types
- Connection Management: Active connection tracking and error handling
βββββββββββββββ TCP/Binary βββββββββββββββββββ
β Producers β βββββββββββββββββββββββΆβ Obelisk Server β
β β β β
βββββββββββββββ βββββββββββββββββββ
β
ββββββββββββββββββββΌββββββββββββββββββββ
βΌ βΌ βΌ
βββββββββββββββββββ βββββββββββββββββββ βββββββββββββββββββ
β Ring Buffers β β Batch Manager β β Topic Storage β
β (Fast Reads) β β (Efficient I/O) β β (Persistence) β
βββββββββββββββββββ βββββββββββββββββββ βββββββββββββββββββ
β
βββββββββββΌββββββββββ
βΌ βΌ βΌ
ββββββββββββββββββββββββββββββββ
β Per-Topic Storage β
β βββββββββββββββββββββββββββ β
β β .log ββ .idx β β
β β(messages)ββ(offsetβpos) β β
β βββββββββββββββββββββββββββ β
ββββββββββββββββββββββββββββββββ
β
βΌ
βββββββββββββββββββ
β Consumers β
β (Poll/Commit) β
βββββββββββββββββββ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β OBELISK SERVER β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β Producer Message βββΆ β TCP Handler β βββΆ β Ring Buffer β β
β β β β
β ββββΆ β Topic Batcher β βββΆ β Disk Storage β β
β β β
β β topic-0.log β
β β topic-0.idx β
β β topic-1.log β
β β topic-1.idx β
β β
β β
β Consumer Poll Request βββΆ β Storage Layer β βββΆ β Index Lookup β βββΆ Response β
β β β
β ββββΆ β File Seek & Read β β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Topic: user-events
βββ user-events.log (Binary Messages)
β ββββββββββββββββββββββββββββββββββββββββ
β β [Length][Timestamp][Topic][Key][Value] β β Message 0
β β [Length][Timestamp][Topic][Key][Value] β β Message 1
β β [Length][Timestamp][Topic][Key][Value] β β Message 2
β ββββββββββββββββββββββββββββββββββββββββ
β
βββ user-events.idx (Offset Index)
βββββββββββββββββββββββββββββββββββββββ
β [0] β byte position 0 β
β [1] β byte position 156 β
β [2] β byte position 312 β
βββββββββββββββββββββββββββββββββββββββ
Consumer tracks: "I'm at offset 1" β Index lookup β Seek to byte 156 β Read from there
- Go 1.21 or higher
- 500MB disk space (for logs and indexes)
# Clone the repository
git clone https://github.com/mush1e/obelisk.git
cd obelisk
# Install dependencies
go mod tidy
# Create data directories
mkdir -p data/topics
# Start the Obelisk server
go run cmd/Obelisk/main.goServer starts on:
- TCP Server:
:8080(message ingestion) - HTTP Server:
:8081(REST API, health checks, metrics)
package main
import (
"bufio"
"net"
"time"
"github.com/mush1e/obelisk/internal/message"
"github.com/mush1e/obelisk/pkg/protocol"
)
func main() {
// Connect to Obelisk server
conn, _ := net.Dial("tcp", "localhost:8080")
defer conn.Close()
writer := bufio.NewWriter(conn)
// Create and send message
msg := message.Message{
Timestamp: time.Now(),
Topic: "user-events",
Key: "user123",
Value: "User logged in",
}
msgBytes, _ := message.Serialize(msg)
protocol.WriteMessage(writer, msgBytes)
// Read acknowledgment
response := make([]byte, 3)
conn.Read(response)
// Response: "OK\n" for success, "NACK:reason" for failure
}# Overall system health
curl http://localhost:8081/health
# Kubernetes readiness probe
curl http://localhost:8081/health/ready
# Kubernetes liveness probe
curl http://localhost:8081/health/live
# Prometheus metrics
curl http://localhost:8081/metrics{
"status": "healthy",
"timestamp": "2024-01-15T10:30:00Z",
"uptime": "2h15m30s",
"components": {
"buffer": {
"status": "healthy",
"details": {
"success_rate": 0.98,
"threshold": 0.95,
"operation_count": 15420
},
"last_check": "2024-01-15T10:30:00Z"
},
"batcher": {
"status": "healthy",
"details": {
"success_rate": 0.99,
"last_flush": "2024-01-15T10:29:55Z",
"threshold": 0.95,
"operation_count": 15420
},
"last_check": "2024-01-15T10:30:00Z"
}
},
"summary": {
"total_components": 4,
"healthy": 4,
"degraded": 0,
"unhealthy": 0
}
}package main
import (
"fmt"
"github.com/mush1e/obelisk/internal/consumer"
)
func main() {
// Create consumer for topic
consumer := consumer.NewConsumer("data/topics", "user-events")
for {
// Poll for new messages
messages, _ := consumer.Poll("user-events")
if len(messages) > 0 {
// Process messages
for _, msg := range messages {
fmt.Printf("Processing: %s -> %s\n", msg.Key, msg.Value)
}
// Commit progress (enables crash recovery)
offset, _ := consumer.GetCurrentOffset("user-events")
consumer.Commit("user-events", offset + uint64(len(messages)))
}
time.Sleep(1 * time.Second)
}
}Producer Test Client:
# Send 150 messages quickly (tests batching)
go run cmd/test-client/main.go -test=size
# Send messages with delays (tests time-based flushing)
go run cmd/test-client/main.go -test=time
# Realistic workload simulation
go run cmd/test-client/main.go -test=realisticConsumer Test Client:
# Single poll (get messages once)
go run cmd/test-consumer/main.go -topic=topic-0 -mode=poll
# Continuous polling (keep checking for new messages)
go run cmd/test-consumer/main.go -topic=topic-1 -mode=continuous
# Reset and replay from beginning
go run cmd/test-consumer/main.go -topic=topic-2 -mode=resetRead All Messages:
# Read all topics
go run cmd/test-reader/main.go
# Read specific directory
go run cmd/test-reader/main.go /path/to/topics# Terminal 1: Start server
go run cmd/Obelisk/main.go
# Terminal 2: Send test messages
go run cmd/test-client/main.go -test=realistic
# Terminal 3: Start consumer
go run cmd/test-consumer/main.go -topic=topic-1 -mode=continuous
# Terminal 2: Send more messages (watch consumer pick them up)
go run cmd/test-client/main.go -test=sizeThe server exposes comprehensive metrics at /metrics:
- Message Throughput:
obelisk_messages_received_total,obelisk_messages_stored_total - Performance:
obelisk_batch_size,obelisk_flush_duration_seconds - System Health:
obelisk_health_status,obelisk_component_health - Connection Stats:
obelisk_active_connections,obelisk_connections_total - Error Tracking:
obelisk_connection_errors_total,obelisk_messages_failed_total
- Liveness Probe (
/health/live): Basic service availability - Readiness Probe (
/health/ready): Service ready to accept traffic - Health Check (
/health): Comprehensive system health status
- Buffer Health: Success rate and operation count
- Batcher Health: Flush success rate and timing
- Storage Health: File system accessibility
- TCP Server Health: Connection status and listener health
βββββββββββββββββββ βββββββββββββββ βββββββββββββββββββ
β Web App βββββΆβ Obelisk βββββΆβ Analytics β
β (User Actions) β β Topic: β β Service β
β β β "clicks" β β β
βββββββββββββββββββ βββββββββββββββ βββββββββββββββββββ
βββββββββββββββββββ βββββββββββββββ βββββββββββββββββββ
β Order βββββΆβ Obelisk βββββΆβ Inventory β
β Service β β Topic: β β Service β
β β β "orders" β β β
βββββββββββββββββββ βββββββββββββββ βββββββββββββββββββ
βββββΆβββββββββββββββββββ
β Payment β
β Service β
βββββββββββββββββββ
βββββββββββββββββββ βββββββββββββββ βββββββββββββββββββ
β 1000s of βββββΆβ Obelisk βββββΆβ Time Series β
β Sensors β β Topic: β β Database β
β β β "sensors" β β β
βββββββββββββββββββ βββββββββββββββ βββββββββββββββββββ
obelisk/
βββ cmd/
β βββ Obelisk/main.go # Server entry point
β βββ test-client/main.go # Producer test client
β βββ test-consumer/main.go # Consumer test client
β βββ test-reader/main.go # Debug message reader
βββ internal/
β βββ batch/ # Batched disk writes
β β βββ batcher.go # TopicBatcher with per-topic batching
β βββ buffer/ # Ring buffer for fast reads
β β βββ buffer.go # TopicBuffers + Buffer implementation
β β βββ buffer_test.go # Unit tests
β βββ consumer/ # Consumer API
β β βββ consumer.go # Poll/Commit/Reset functionality
β βββ errors/ # Error handling and categorization
β β βββ errors.go # Error types and retry logic
β βββ handlers/ # HTTP request handlers
β β βββ health.go # Health check endpoints
β β βββ messages.go # Message REST API
β β βββ stats.go # Statistics endpoints
β β βββ middleware.go # HTTP middleware
β βββ health/ # Health tracking system
β β βββ tracker.go # Health status tracking
β β βββ ring.go # Health history ring buffer
β βββ message/ # Message format
β β βββ message.go # Binary serialization
β β βββ message_test.go # Serialization tests
β βββ metrics/ # Prometheus metrics
β β βββ metrics.go # Metric definitions
β βββ retry/ # Retry mechanisms
β β βββ retry.go # Configurable retry policies
β βββ server/ # Server infrastructure
β β βββ server.go # Main server orchestration
β β βββ tcp_server.go # TCP message handling
β β βββ http_server.go # HTTP REST API
β βββ services/ # Business logic services
β β βββ broker_service.go # Core broker functionality
β βββ storage/ # Persistent storage
β βββ storage.go # Main storage interface
β βββ filepool.go # File descriptor pooling
β βββ corruption_test.go # Corruption recovery tests
βββ pkg/
β βββ protocol/ # Wire protocol
β βββ protocol.go # Length-prefixed binary protocol
βββ data/topics/ # Topic storage (created at runtime)
β βββ topic-0.log # Message data
β βββ topic-0.idx # Offset index
β βββ topic-1.log
β βββ topic-1.idx
βββ go.mod
βββ LICENSE
βββ README.md
- Topic-based messaging
- Consumer offset tracking
- Indexed storage for fast seeks
- Batched disk I/O
- Ring buffer caching
- Binary protocol optimization
- HTTP REST API
- Comprehensive health monitoring
- Prometheus metrics
- Error categorization and retry logic
- File pooling and corruption recovery
- gRPC support for high-performance clients
- Consumer groups (multiple consumers sharing work)
- Message retention policies
- Topic compaction
- Advanced filtering and routing
- Multi-node clustering
- Leader election and consensus
- Cross-datacenter replication
- Automatic partitioning
- Load balancing
- Authentication and authorization
- TLS encryption
- Schema registry
- Dead letter queues
- Message tracing
- Backup and restore
We welcome contributions! Here's how to get started:
- Fork the repository
- Create a feature branch:
git checkout -b feature/amazing-feature - Make your changes with tests
- Run the test suite:
go test ./... - Commit your changes:
git commit -m 'Add amazing feature' - Push to the branch:
git push origin feature/amazing-feature - Open a Pull Request
# Install development dependencies
go mod tidy
# Run tests
go test ./...
# Run specific test suites
go test ./internal/storage -v
go test ./internal/buffer -v
# Start development server
go run cmd/Obelisk/main.goThis project is licensed under the MIT License - see the LICENSE file for details.
- Inspired by Apache Kafka's distributed log architecture
- Binary protocol design influenced by Redis and MessagePack
- Batching strategy adapted from RocksDB and LevelDB
- Health monitoring patterns from Kubernetes and modern microservices
- Issues: GitHub Issues
- Discussions: GitHub Discussions
- Documentation: Wiki
Obelisk: Standing tall as your distributed system's source of truth. πΏ