TaskFlow is a robust, distributed task queue system built to handle asynchronous background processing at scale. Designed with a microservices architecture, it reliably manages, schedules, and executes jobs across multiple concurrent worker nodes.
The system leverages FastAPI for high-performance task submission and monitoring, Redis for efficient message brokering and state management, and PostgreSQL for durable persistence of task history and results. By implementing the "Competing Consumers" pattern, TaskFlow ensures load balancing and fault tolerance—if one worker fails, others seamlessly pick up the load.
Key Capabilities:
- Distributed Processing: Horizontally scalable worker nodes process tasks in parallel.
- Reliable Scheduling: Intelligent task distribution with Redis-based locking to prevent race conditions.
- Production Ready: Supports both Docker Compose and Kubernetes deployments.
- Persistent & Observable: Complete audit trail of task states (Queued → Processing → Completed/Failed) via REST endpoints.
|
|
Deploy TaskFlow in production using Docker Compose.
curl -sSL https://raw.githubusercontent.com/dhruvkshah75/TaskFlow/main/install.sh | bash- Download Configuration:
curl -O https://raw.githubusercontent.com/dhruvkshah75/TaskFlow/main/docker-compose.prod.yml
curl -O https://raw.githubusercontent.com/dhruvkshah75/TaskFlow/main/.env.production.example
curl -O https://raw.githubusercontent.com/dhruvkshah75/TaskFlow/main/scripts/setup-production.sh
chmod +x setup-production.sh- Generate Credentials & Deploy:
./setup-production.sh
docker compose --env-file .env.production -f docker-compose.prod.yml up -d- Verify:
curl http://localhost:8000/statusDeploy TaskFlow to a local Minikube cluster or a production Kubernetes environment.
- Kubernetes Cluster (Minikube, EKS, GKE, etc.)
kubectlCLI tool
Note: The
secrets.yamlfile is gitignored to prevent sensitive data leaks. You must create it manually or generate it from your environment variables.
If you have a local .env file with your credentials:
kubectl create secret generic taskflow-secrets \
--from-env-file=.env \
--dry-run=client -o yaml > k8s/secrets.yamlCreate a file named k8s/secrets.yaml using the template below. You must base64 encode your values (e.g., echo -n "mypassword" | base64).
apiVersion: v1
kind: Secret
metadata:
name: taskflow-secrets
namespace: taskflow
type: Opaque
data:
DATABASE_URL: <base64-encoded-url>
SECRET_KEY: <base64-encoded-key>
REDIS_PASSWORD: <base64-encoded-password># command to create a screts.yaml file
kubectl create secret generic taskflow-db-secret \
--namespace=taskflow \
--from-literal=POSTGRES_DB=taskflow_db \
--from-literal=POSTGRES_USER=postgres \
--from-literal=POSTGRES_PASSWORD=password \
--from-literal=DATABASE_URL=postgresql://postgres:password@taskflow-pgbouncer:6432/taskflow_db \
--dry-run=client -o yaml > secrets.yaml
echo "---" >> secrets.yaml
kubectl create secret generic taskflow-redis-secret \
--namespace=taskflow \
--from-literal=REDIS_PASSWORD=test_password \
--from-literal=REDIS_HOST_HIGH=redis-high \
--from-literal=REDIS_PORT_HIGH=6379 \
--from-literal=REDIS_HOST_LOW=redis-low \
--from-literal=REDIS_PORT_LOW=6379 \
--dry-run=client -o yaml >> secrets.yam
echo "---" >> secrets.yaml
kubectl create secret generic taskflow-app-secret \
--namespace=taskflow \
--from-literal=SECRET_KEY=test_secret_key_for_ci_only \
--from-literal=ALGORITHM=HS256 \
--from-literal=ACCESS_TOKEN_EXPIRE_MINUTES=60 \
--dry-run=client -o yaml >> secrets.yaml
# 1. Create Namespace & Configs
kubectl create namespace taskflow
kubectl apply -f k8s/secrets.yaml
kubectl apply -f k8s/configmap.yaml
# 2. Deploy Infrastructure (Postgres & Redis)
kubectl apply -f k8s/postgres.yaml
kubectl apply -f k8s/redis.yaml
# 3. Deploy Application (API, Worker, Queue Manager)
kubectl apply -f k8s/api.yaml
kubectl apply -f k8s/worker.yaml
kubectl apply -f k8s/queue-manager.yaml
# 4. Verify Pods
kubectl get pods -n taskflow -w
# 5. For port forwarding
kubectl port-forward -n taskflow svc/taskflow-api 8080:80If using Minikube, you need to tunnel the service to access it:
minikube tunnel
# Open http://localhost:8000/docs in your browserIf you want to develop or contribute to TaskFlow, follow these steps:
git clone https://github.com/dhruvkshah75/TaskFlow.git
cd TaskFlow
cp .env.docker.example .env.docker
# Start services
docker compose build
docker compose up -d
# Run migrations
docker compose exec api alembic upgrade head# Setup Python Env
python -m venv venv
source venv/bin/activate
pip install -r requirements.txt
# Configure Env
cp .env.example .env
# (Edit .env with your local DB/Redis credentials)
# Run
uvicorn api.main:app --reload
python -m worker.main- API Service: Exposes REST endpoints for task submission and monitoring.
- Queue Manager: Distributes tasks to appropriate Redis queues.
- Workers: Scalable consumers that execute tasks.
- Redis: In-memory message broker.
- PostgreSQL: Persistent storage for users and task history.
TaskFlow/
├── Deployment
│ ├── docker-compose.prod.yml # Docker Production
│ ├── k8s/ # Kubernetes Manifests
│ │ ├── apps/ # API, Worker, Queue Manager
│ │ └── infrastructure/ # Redis, Postgres, Configs
│
├── API Service
│ └── api/ # FastAPI application
│
├── Core Services
│ └── core/ # Database, Config, Redis Client
│
├── Worker Service
│ └── worker/ # Python Task Executors
│
└── Database
└── alembic/ # Migration scripts
Create a .env file in the root directory:
DATABASE_URL=postgresql://user:pass@localhost:5432/taskflow
SECRET_KEY=replace_with_secure_key
ALGORITHM=HS256
ACCESS_TOKEN_EXPIRE_MINUTES=180
# Redis Configuration
# Note: In K8s, these are overridden by env vars in the deployment YAMLs
REDIS_HOST_HIGH=redis_high
REDIS_PORT_HIGH=6379
REDIS_HOST_LOW=redis_low
REDIS_PORT_LOW=6379
# App Settings
RATE_LIMIT_PER_HOUR=1000
HEARTBEAT_INTERVAL_SECONDS=30

