Skip to content

dhruvkshah75/TaskFlow

Repository files navigation

Distributed Task Queue (TaskFlow)

Docker Kubernetes FastAPI Python Redis PostgreSQL CI/CD

Overview

TaskFlow is a robust, production-ready distributed task queue system built to handle asynchronous background processing at scale. Designed with a microservices architecture, it reliably manages, schedules, and executes jobs across multiple concurrent worker nodes with built-in auto-scaling capabilities.

The system leverages FastAPI for high-performance task submission and monitoring, Redis for efficient message brokering and state management, and PostgreSQL for durable persistence of task history and results. By implementing the "Competing Consumers" pattern, TaskFlow ensures load balancing and fault tolerance—if one worker fails, others seamlessly pick up the load.

Key Capabilities:

  • Distributed Processing: Horizontally scalable worker nodes process tasks in parallel
  • Reliable Scheduling: Intelligent task distribution with Redis-based locking to prevent race conditions
  • Production Ready: Docker Compose and Kubernetes deployments with automated CI/CD pipelines
  • Persistent & Observable: Complete audit trail of task states (Queued → Processing → Completed/Failed) via REST endpoints
  • Auto-Scaling: Kubernetes HPA (Horizontal Pod Autoscaler) support for dynamic worker scaling based on queue depth
  • Continuous Integration: Automated end-to-end stress testing with 200+ concurrent tasks validation
  • Interactive Showcase: Live demo website with project overview and system architecture

See TaskFlow in Action

taskflow-io.vercel.app - Interactive demonstration with project overview, architecture, and demo videos

TaskFlow Scaling Proof

TaskFlow Auto-Scaling Demo

Quick Deployment (Docker)

Deploy TaskFlow in production using Docker Compose.

One-Line Install

curl -sSL https://raw.githubusercontent.com/dhruvkshah75/TaskFlow/main/install.sh | bash

Manual Install

  1. Download Configuration:
curl -O https://raw.githubusercontent.com/dhruvkshah75/TaskFlow/main/docker-compose.prod.yml
curl -O https://raw.githubusercontent.com/dhruvkshah75/TaskFlow/main/.env.production.example
curl -O https://raw.githubusercontent.com/dhruvkshah75/TaskFlow/main/scripts/setup-production.sh
chmod +x setup-production.sh
  1. Generate Credentials & Deploy:
./setup-production.sh
docker compose --env-file .env.production -f docker-compose.prod.yml up -d
  1. Verify:
curl http://localhost:8000/status

CI/CD & Testing

TaskFlow includes automated continuous integration with comprehensive end-to-end testing:

Automated Stress Testing

  • Workflow: Triggered on every push to main (excluding README changes)
  • Test Coverage: Deploys full Kubernetes stack with 200+ concurrent task submissions
  • Validation: Automated verification of task processing, database persistence, and worker scaling
  • Caching: Docker layer caching for faster builds (~50% reduction in CI time)

Run Tests Locally

# Submit 200 concurrent tasks to stress test the system
python stress-test.py

# Verify all tasks were processed correctly
python verify_jobs.py

View CI/CD workflows in .github/workflows/:

  • ci.yaml - Standard integration tests
  • ci-caching.yaml - Optimized workflow with Docker and pip caching
  • delivery.yaml - Complete CI/CD pipeline with parrallel jobs for building docker images and deploying to docker hub and ghcr registry and end-to-end testing

Kubernetes Deployment

Deploy TaskFlow to a local Minikube cluster or a production Kubernetes environment.

Prerequisites

  • Kubernetes Cluster (Minikube, EKS, GKE, etc.)
  • kubectl CLI tool

1. Secrets Management (Important)

Note: The secrets.yaml file is gitignored to prevent sensitive data leaks. You must create it manually or generate it from your environment variables.

Option A: Generate from .env (Recommended)

If you have a local .env file with your credentials:

kubectl create secret generic taskflow-secrets \
  --from-env-file=.env \
  --dry-run=client -o yaml > k8s/secrets.yaml

Option B: Manual Creation

Create a file named k8s/secrets.yaml using the template below. You must base64 encode your values (e.g., echo -n "mypassword" | base64).

apiVersion: v1
kind: Secret
metadata:
  name: taskflow-secrets
  namespace: taskflow
type: Opaque
data:
  DATABASE_URL: <base64-encoded-url>
  SECRET_KEY: <base64-encoded-key>
  REDIS_PASSWORD: <base64-encoded-password>

2. Deploying the Stack

# command to create a screts.yaml file
kubectl create secret generic taskflow-db-secret \
  --namespace=taskflow \
  --from-literal=POSTGRES_DB=taskflow_db \
  --from-literal=POSTGRES_USER=postgres \
  --from-literal=POSTGRES_PASSWORD=password \
  --from-literal=DATABASE_URL=postgresql://postgres:password@taskflow-pgbouncer:6432/taskflow_db \
  --dry-run=client -o yaml > secrets.yaml
echo "---" >> secrets.yaml
kubectl create secret generic taskflow-redis-secret \
  --namespace=taskflow \
  --from-literal=REDIS_PASSWORD=test_password \
  --from-literal=REDIS_HOST_HIGH=redis-high \
  --from-literal=REDIS_PORT_HIGH=6379 \
  --from-literal=REDIS_HOST_LOW=redis-low \
  --from-literal=REDIS_PORT_LOW=6379 \
  --dry-run=client -o yaml >> secrets.yam
echo "---" >> secrets.yaml
kubectl create secret generic taskflow-app-secret \
  --namespace=taskflow \
  --from-literal=SECRET_KEY=test_secret_key_for_ci_only \
  --from-literal=ALGORITHM=HS256 \
  --from-literal=ACCESS_TOKEN_EXPIRE_MINUTES=60 \
  --dry-run=client -o yaml >> secrets.yaml


# Apply the kubernetes manifests 
kubectl apply -f k8s/ --recursive

# Verify Pods
kubectl get pods -n taskflow -w

# For port forwarding 
kubectl port-forward -n taskflow svc/taskflow-api 8080:80

3. Accessing the API

If using Minikube, you need to tunnel the service to access it:

# start the minikube service 
minikube start

minikube tunnel

# for port forwarding 
kubectl port-forward -n taskflow svc/taskflow-api 8080:80

# Open http://localhost:8000/docs in your browser

Development Setup

If you want to develop or contribute to TaskFlow, follow these steps:

Option 1: Docker Development (Recommended)

git clone https://github.com/dhruvkshah75/TaskFlow.git
cd TaskFlow
cp .env.docker.example .env.docker

# Start services
docker compose build
docker compose up -d

# Run migrations
docker compose exec api alembic upgrade head

Option 2: Local Development (No Docker)

# Setup Python Env
python -m venv venv
source venv/bin/activate
pip install -r requirements.txt

# Configure Env
cp .env.example .env
# (Edit .env with your local DB/Redis credentials)

# Run
uvicorn api.main:app --reload
python -m worker.main

Repository Structure

TaskFlow/
├── .github/workflows/           # CI/CD Pipelines
│       ├── ci.yaml                  # Standard integration tests
|       ├── delivery.yaml            # complete ci/cd pipeline 
│       └── ci-caching.yaml          # Optimized builds with caching
├── Deployment
│   ├── docker-compose.prod.yml      # Production Docker Compose
│   └── k8s/                         # Kubernetes Manifests
│       ├── apps/                    # API, Worker, Queue Manager
│       ├── infrastructure/          # Redis, Postgres, PgBouncer
|       ├── autoscaling/
│       ├── 02-configmaps.yaml       # Application configuration
│       └── secrets.yaml.example     # Secret templates (gitignored)
│   
│
├── API Service
│   └── api/                         # FastAPI application
│       ├── routers/                 # REST endpoints
|       ├── main.py                  # Application entry point
│       └── Dockerfile 
│
├── Core Services
│   └── core/                        # Shared utilities
│       ├── database.py              # PostgreSQL connection
│       ├── redis_client.py          # Redis client wrapper
|       ├── queue_manager.py         # The queue handling file 
|       ├── config.py                # Environment configuration
│       └── Dockerfile
│
├── Worker Service
│   └── worker/                      # Task execution engine
│       ├── main.py                  # Worker process
|       ├── heartbeat.py             # sends the heartbeat of the worker periodically
│       └── tasks.py                 # Dummy Tasks 
│
├── alembic/                       # Database migrations
│   
└── public/                        # Project website (Vercel)
│     ├── index.html               # Interactive demo page
|     ├── architecture.html        # page to demonstrate the system architecture 
│     └── assets/                  # Demo GIFs and illustrations
│
├── stress-test.py               # Load testing (200 tasks)
└── verify_jobs.py               # CI/CD validation script

Example .env Configuration

Create a .env file in the root directory:

# Database Configuration
DATABASE_URL=postgresql://user:pass@localhost:5432/taskflow

# Security
SECRET_KEY=replace_with_secure_key
ALGORITHM=HS256
ACCESS_TOKEN_EXPIRE_MINUTES=180

# Redis Configuration
# Note: In K8s, these are overridden by env vars in the deployment YAMLs
REDIS_HOST_HIGH=redis_high
REDIS_PORT_HIGH=6379
REDIS_HOST_LOW=redis_low
REDIS_PORT_LOW=6379
REDIS_PASSWORD=your_redis_password

# App Settings
RATE_LIMIT_PER_HOUR=1000
HEARTBEAT_INTERVAL_SECONDS=30

Contributing

Contributions are welcome! Please feel free to submit a Pull Request. For major changes, please open an issue first to discuss what you would like to change.

License

This project is licensed under the MIT License - see the LICENSE file for details.