High-performance Kafka consumer for GPS tracking data processing with Protobuf support.
- 🚀 High Performance: 5-10x faster than Python version
- 🔄 Async Processing: True parallel processing with Tokio
- 🛡️ Type Safety: Compile-time error prevention
- 📊 Real-time: POI and Geofence processing
- 🔌 Reliable: Auto-reconnection for Kafka
- 📈 Scalable: Handles thousands of messages per second
- 🔧 Flexible: Support for Kafka brokers
- 📦 Protocol Buffers: Native Protobuf support for Kafka messages
# 1. Clone and build
cargo build --release
# 2. Set environment variables
## For Kafka/Redpanda:
export BROKER_TYPE="kafka"
export BROKER_HOST="localhost:9092"
export BROKER_TOPIC="siscom-messages"
## Database:
export DB_HOST="localhost"
export DB_PORT="5432"
export DB_DATABASE="tracking"
export DB_USERNAME="user"
export DB_PASSWORD="password"
export RUST_LOG="info"
# ... (see Configuration section for all variables)
# 3. Run
cargo run --releaseKafka → Consumer → PostgreSQL
The consumer reads from:
- Kafka/Redpanda: Modern streaming platform with Protobuf-encoded messages
Tracking Consumer Rust is a high-performance service designed to consume GPS tracking data from Kafka/Redpanda streaming platform and forward the raw data to PostgreSQL for further use and analytics. The logic for Points of Interest (POI) and Geofence evaluation is handled by a separate microservice that consumes from Kafka.
The consumer supports:
- Kafka Mode: Modern streaming platform with Protocol Buffer messages
-
Startup & Configuration
- Loads configuration from environment variables.
- Initializes logging and prepares for graceful shutdown.
-
Service Initialization
- Connects to PostgreSQL (for persistent storage).
- Configures the Kafka producer (for streaming data).
- Sets up the Kafka consumer (to receive GPS messages).
- Initializes the message processor (handles batching and dispatch).
-
Main Processing Loop
- Starts the Kafka consumer in the background, which receives messages and pushes them to an internal channel.
- The message processor consumes messages from the channel and batches results.
- Data is sent to PostgreSQL.
- Health checks and statistics are periodically logged.
-
Graceful Shutdown
- On receiving a shutdown signal, the application flushes all buffers, closes Kafka connections, and ensures all data is persisted.
flowchart TD
A[Kafka/Redpanda] -->|Protobuf Messages| B(Kafka Consumer)
B --> C[Internal Channel]
C --> D[Message Processor]
D --> E[Kafka Producer]
D --> F[PostgreSQL]
E --> G[Kafka Broker]
F --> H[Database]
G --> I[POI/Geofence Microservice]
- Throughput: 1000-5000 msg/s
- Memory: ~10-20 MB
- Latency: <10ms processing time
- CPU: ~5-15% on modern hardware
The application is configured entirely through environment variables. A template file .env.template is provided with all available options and examples.
-
Copy the template:
cp .env.template .env
-
Edit
.envwith your values:# Edit the .env file with your preferred editor nano .env -
Run with environment file:
# Option 1: Source the .env file set -a && source .env && set +a && cargo run --release # Option 2: Use a tool like dotenv # (The application reads from environment variables directly)
BROKER_TYPE- Required. Broker type:"kafka"BROKER_HOST- Required. Broker connection string- For Kafka:
host:port(e.g.,localhost:9092orredpanda:9092)
- For Kafka:
BROKER_TOPIC- Required. Topic to consume from (default:siscom-messages)
KAFKA_BATCH_SIZE- Batch size for producer (default: 100)KAFKA_BATCH_TIMEOUT_MS- Batch timeout in ms (default: 100)KAFKA_COMPRESSION- Compression type:snappy,gzip, etc. (default: snappy)KAFKA_RETRIES- Number of retries (default: 3)KAFKA_SECURITY_PROTOCOL- Security protocol (e.g.,SASL_PLAINTEXT,SASL_SSL)KAFKA_SASL_MECHANISM- SASL mechanism (e.g.,SCRAM-SHA-256,PLAIN)KAFKA_USERNAME- SASL username for authenticationKAFKA_PASSWORD- SASL password for authentication
DB_HOST- PostgreSQL hostnameDB_PORT- PostgreSQL port (default: 5432)DB_DATABASE- Database nameDB_USERNAME- Database usernameDB_PASSWORD- Database passwordDB_MAX_CONNECTIONS- Maximum connections (default: 20)DB_MIN_CONNECTIONS- Minimum connections (default: 5)DB_CONNECTION_TIMEOUT_SECS- Connection timeout (default: 30)DB_IDLE_TIMEOUT_SECS- Idle timeout (default: 600)
PROCESSING_WORKER_THREADS- Number of worker threads (default: 4)PROCESSING_MESSAGE_BUFFER_SIZE- Message buffer size (default: 10000)PROCESSING_BATCH_PROCESSING_SIZE- Batch processing size (default: 100)PROCESSING_MAX_PARALLEL_DEVICES- Max parallel devices (default: 50)
RUST_LOG- Log level (e.g., "info", "debug", "warn", "error")LOGGING_FILE_PATH- Log file path (optional)LOGGING_MAX_FILE_SIZE_MB- Max log file size in MB (default: 100)LOGGING_MAX_FILES- Max number of log files (default: 10)LOGGING_JSON_FORMAT- Use JSON format for logs (default: true)
export BROKER_TYPE=kafka
export BROKER_HOST=localhost:9092
export BROKER_TOPIC=siscom-messages- Connects to Kafka/Redpanda streaming platforms
- Consumes Protocol Buffer messages from Kafka topics
- Higher throughput and better scalability for large deployments
Messages follow the siscom.proto schema with fields for:
- Device identification and location data
- Normalized/homogenized tracking information
- Vendor-specific decoded data (Suntech/Queclink)
- Message metadata (timestamps, client info)
📖 Para información detallada sobre serialización y deserialización, consulte docs/serialization-guide.md
-
Install Rust:
curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh source $HOME/.cargo/env
-
Clone and setup:
git clone <repository-url> cd siscom-consumer cp .env.template .env # Edit .env with your configuration
-
Build and run:
# Build cargo build --release # Run with debug logs RUST_LOG=debug cargo run # Run tests cargo test # Format code cargo fmt # Check code quality cargo clippy
The project includes a complete Docker Compose setup with infrastructure services:
For Kafka mode:
# Copy and edit environment
cp .env.template .env
# Edit .env to set BROKER_TYPE=kafka
# Run with Kafka (Redpanda) infrastructure
docker-compose --profile kafka upAvailable services:
siscom-consumer: Main applicationpostgres: PostgreSQL databaseredpanda: Kafka-compatible streaming platform (Kafka profile)
# Build container
docker build -t siscom-consumer .
# Run with environment file
docker run --env-file .env siscom-consumer
# Run with specific environment variables
docker run -e BROKER_TYPE=kafka -e BROKER_HOST=host.docker.internal:9092 siscom-consumerexport BROKER_TYPE=kafka
export BROKER_HOST=localhost:9092
export BROKER_TOPIC=test-messages
export DB_HOST=localhost
export DB_DATABASE=tracking_dev
export RUST_LOG=debugexport BROKER_TYPE=kafka
export BROKER_HOST=redpanda:9092
export BROKER_TOPIC=siscom-messages
export DB_HOST=postgres
export DB_DATABASE=tracking
export RUST_LOG=infoexport BROKER_TYPE=kafka
export BROKER_HOST=kafka-cluster.company.com:9092
export BROKER_TOPIC=tracking-events
export DB_HOST=postgres-prod.company.com
export DB_DATABASE=tracking_prod
export RUST_LOG=warnexport BROKER_TYPE=kafka
export BROKER_HOST=kafka-cluster.company.com:9092
export BROKER_TOPIC=tracking-events
export KAFKA_SECURITY_PROTOCOL=SASL_PLAINTEXT
export KAFKA_SASL_MECHANISM=SCRAM-SHA-256
export KAFKA_USERNAME=siscom-consumer
export KAFKA_PASSWORD=your-secure-password
export DB_HOST=postgres-prod.company.com
export DB_DATABASE=tracking_prod
export RUST_LOG=warn"Connection refused" for Kafka:
- Verify Kafka brokers are running and accessible
- Check network connectivity and firewall rules
- Ensure topic exists:
kafka-topics --create --topic siscom-messages --bootstrap-server localhost:9092
Protobuf decode errors:
- Verify message format matches
siscom.protoschema - Check message serialization in producer applications
- Enable debug logging:
RUST_LOG=debug
The application provides health checks and metrics:
- Health endpoint: Application logs connection status every 30 seconds
- Metrics: DB buffer size, Kafka buffer size, batch statistics logged every 60 seconds
- Logs: Structured JSON logs (configurable) with detailed error information
- Fork the repository
- Create a feature branch:
git checkout -b feature/amazing-feature - Make your changes and add tests
- Run tests:
cargo test - Format code:
cargo fmt && cargo clippy - Commit your changes:
git commit -m 'Add amazing feature' - Push to the branch:
git push origin feature/amazing-feature - Open a Pull Request
- Rust Edition: 2021
- Formatting:
cargo fmt - Linting:
cargo clippy - Testing:
cargo testwith >80% coverage - Documentation: All public APIs documented
- Error Handling: Use
anyhowfor application errors,thiserrorfor library errors
Para desarrolladores que necesiten integrar con el sistema SISCOM Consumer, consulte la guía completa de serialización:
📖 Guía de Serialización y Deserialización
Esta guía incluye:
- Esquema completo de Protocol Buffers
- Ejemplos de serialización en múltiples lenguajes (Rust, Python, JavaScript)
- Formatos de mensajes para dispositivos Suntech y Queclink
- Integración con Apache Kafka
- Validaciones y mejores prácticas
- Use Docker secrets or Kubernetes secrets for sensitive data
- Set
RUST_LOG=warnorRUST_LOG=errorin production - Configure proper database connection pooling
- Enable JSON logging:
LOGGING_JSON_FORMAT=true - Configure log aggregation (ELK, Loki, etc.)
- Set up health checks and metrics collection
- Monitor Kafka consumer lag and database connections
# High-throughput settings
export PROCESSING_MESSAGE_BUFFER_SIZE=50000
export PROCESSING_BATCH_PROCESSING_SIZE=500
export PROCESSING_WORKER_THREADS=8
export DB_MAX_CONNECTIONS=50
# Kafka optimization
export KAFKA_BATCH_SIZE=1000
export KAFKA_BATCH_TIMEOUT_MS=50- Configure TLS for all connections
- Use managed databases with proper security groups
- Rotate credentials regularly
Example deployment manifest:
apiVersion: apps/v1
kind: Deployment
metadata:
name: siscom-consumer
spec:
replicas: 3
template:
spec:
containers:
- name: siscom-consumer
image: siscom-consumer:latest
envFrom:
- secretRef:
name: siscom-secrets
resources:
requests:
memory: "256Mi"
cpu: "100m"
limits:
memory: "512Mi"
cpu: "500m"This project is licensed under the MIT License - see the LICENSE file for details.