Skip to content

intellistream/sageLLM

Repository files navigation

sageLLM - 智能 LLM 推理调度控制平面

基于 vLLM 的高性能、智能化 LLM 推理调度管理系统

| 概述 | 核心特性 | 快速开始 | 开发设置 | 架构 | 集成指南 | 部署指南 |


概述

sageLLM 是 SAGE 项目中的 LLM 推理控制平面,提供智能请求调度、多实例管理和动态并行优化。它位于用户应用和 vLLM 执行引擎之间,负责请求的智能调度、路由和性能优化。

核心价值

Control Plane 作为 sageLLM 的核心组件,提供:

  • 智能请求调度:FIFO、优先级、SLO感知、成本优化、自适应 5 种调度算法
  • PD 分离优化:将 Prefilling 和 Decoding 请求分别路由到专门优化的实例(+50-80% 吞吐,-50-60% 延迟)
  • 多实例管理:统一管理多个 vLLM 实例,支持不同并行策略
  • 动态并行优化:自动选择最优的模型并行方案(TP、PP、DP、EP、混合)
  • 负载均衡路由:多种路由算法确保资源高效利用
  • 性能监控:实时监控和指标收集

🚀 快速开始

用户安装(使用)

# 基础安装
pip install -e .

# 运行示例
python control_plane/examples/demo_control_plane.py

开发者安装(贡献)

# 一键设置开发环境(推荐)
./setup-dev.sh

# 或手动设置
pip install -e ".[dev]"
pre-commit install

# 验证安装
pytest
pre-commit run --all-files

setup-dev.sh 会自动:

  • ✅ 检查 Python 版本 (>= 3.10)
  • ✅ 安装开发依赖
  • ✅ 安装 pre-commit hooks
  • ✅ 可选:运行代码质量检查

详见 DEVELOPMENT.md 了解完整开发指南。

📦 项目结构

sageLLM/
├── control_plane/                 # ⭐ Control Plane 核心组件
│   ├── manager.py                # 控制平面管理器 - 核心协调层
│   ├── executor.py               # 执行协调器 - vLLM 实例管理
│   ├── strategies/               # 调度策略模块
│   │   ├── base.py              # 策略基类
│   │   ├── fifo.py              # FIFO 策略
│   │   ├── priority.py          # 优先级策略
│   │   ├── slo_aware.py         # SLO 感知策略
│   │   ├── cost_optimized.py    # 成本优化策略
│   │   └── adaptive.py          # 自适应策略
│   ├── pd_routing.py             # PD 分离路由 - Prefilling/Decoding 优化
│   ├── router.py                 # 请求路由 - 负载均衡/亲和性/局部性
│   ├── parallelism.py            # 并行策略 - TP/PP/DP/EP/Hybrid
│   ├── monitoring.py             # 性能监控 - 指标收集与分析
│   ├── topology.py               # 拓扑检测 - NVLINK/NUMA 感知
│   ├── types.py                  # 类型定义 - 数据模型和枚举
│   └── examples/                 # 📖 使用示例
│       ├── example_http_client.py   # HTTP 客户端模式示例
│       ├── demo_control_plane.py    # 完整演示(无需 vLLM 实例)
│       └── README.md                # 示例文档
│
├── docs/                          # 📚 文档目录
│   ├── INTEGRATION.md            # 集成架构文档
│   ├── DEPLOYMENT.md             # 部署指南
│   ├── CUSTOM_SCHEDULING.md      # 自定义调度策略开发指南
│   ├── METRICS.md                # 监控指标文档
│   ├── TOPOLOGY.md               # 拓扑感知配置文档
│   └── FAULT_TOLERANCE.md        # 故障容错机制文档
│
├── vendors/vllm/                 # vLLM 源代码 (vendored)
│   ├── vllm/                     # Python 模块
│   ├── csrc/                     # CUDA 内核
│   ├── cmake/                    # 编译配置
│   └── ...
│
├── tests/
│   ├── control_plane/            # Control Plane 单元测试
│   │   ├── test_scheduling.py    # 调度测试 (5 tests)
│   │   ├── test_pd_separation.py # PD 分离测试 (5 tests)
│   │   ├── test_executor.py      # 执行器测试 (5 tests)
│   │   └── test_integration.py   # 集成测试 (5 tests)
│   │
│   └── vendors/vllm/tests/       # vLLM 原有测试
│
├── setup.py                      # 安装脚本
├── requirements.txt              # 依赖配置
├── requirements-dev.txt          # 开发依赖
└── README.md                     # 本文档

🎯 核心特性

1️⃣ 智能请求调度(5种策略)

Control Plane 提供多种调度策略,适应不同场景:

策略 特点 适用场景
FIFO 先到先得,简单公平 简单场景、公平处理
Priority 优先级排序 SaaS平台、分级服务
SLO-Aware SLO感知调度,保证延迟要求 有延迟要求的应用
Cost-Optimized 成本优化,在满足要求下最小化成本 云端部署、成本敏感
Adaptive 自适应选择,根据负载动态切换 生产环境、动态负载
from control_plane import ControlPlaneManager

# 创建控制平面,使用自适应调度
manager = ControlPlaneManager(
    scheduling_policy="adaptive",  # 可选:fifo, priority, slo_aware, cost_optimized
    routing_strategy="load_balanced",
    enable_monitoring=True,
)

2️⃣ PD 分离 - 性能优化(+50-80% 吞吐,-50-60% 延迟)

将 Prefilling(长输入处理)和 Decoding(生成输出)请求路由到专门优化的实例:

核心理念:

  • Prefilling 阶段(长输入):优化吞吐量 → 高 TP (4-8),大批处理
  • Decoding 阶段(短输入):优化延迟 → 低 TP (1),高并发
from control_plane import (
    ControlPlaneManager,
    ExecutionInstance,
    ExecutionInstanceType,
    PDSeparationConfig,
    PrefillingConfig,
    DecodingConfig,
)

# 启用 PD 分离
pd_config = PDSeparationConfig(
    enabled=True,
    routing_policy="adaptive",
    prefilling_threshold_input_tokens=800,
)

manager = ControlPlaneManager(
    scheduling_policy="adaptive",
    enable_pd_separation=True,
    pd_config=pd_config,
)

# Prefilling 实例 (优化吞吐)
prefilling_instance = ExecutionInstance(
    instance_id="prefill-1",
    host="localhost",
    port=8000,
    model_name="meta-llama/Llama-2-7b",
    instance_type=ExecutionInstanceType.PREFILLING,
    tensor_parallel_size=4,  # 高吞吐
    gpu_count=4,
    prefilling_config=PrefillingConfig(
        target_batch_size=64,
        enable_chunked_prefill=True,
    ),
)

# Decoding 实例 (优化延迟)
decoding_instance = ExecutionInstance(
    instance_id="decode-1",
    host="localhost",
    port=8001,
    model_name="meta-llama/Llama-2-7b",
    instance_type=ExecutionInstanceType.DECODING,
    tensor_parallel_size=1,  # 低延迟
    gpu_count=1,
    decoding_config=DecodingConfig(
        target_latency_ms=50,
        max_parallel_requests=200,
    ),
)

manager.register_instance(prefilling_instance)
manager.register_instance(decoding_instance)

性能对比:

指标 单实例 PD分离 提升
吞吐量 (tokens/s) 100 150-180 +50-80%
P99延迟 (ms) 120 50-60 -50-60%
GPU利用率 75% 90% +15%
成本效率 baseline 1.8x +80%

3️⃣ 动态并行策略(5种方案)

自动选择最优的模型并行方案,支持 TP、PP、DP、EP、Hybrid:

并行策略 说明 适用场景
TP (Tensor Parallel) 张量并行,模型权重切分 单模型太大无法放入单GPU
PP (Pipeline Parallel) 流水线并行,模型层切分 超大模型(70B+)
DP (Data Parallel) 数据并行,模型复制 高吞吐场景
EP (Expert Parallel) 专家并行,MoE模型 Mixtral等MoE模型
Hybrid 混合并行,组合多种策略 超大模型+高吞吐
from control_plane import ParallelismConfig

# 自动优化并行配置
config = ParallelismConfig(
    auto_optimize=True,
    supported_strategies=["TP", "PP", "Hybrid"],
)

# 手动指定并行配置
instance = ExecutionInstance(
    instance_id="hybrid-instance",
    tensor_parallel_size=4,     # TP=4
    pipeline_parallel_size=2,   # PP=2
    data_parallel_size=2,       # DP=2
    gpu_count=16,
)

并行方案推荐:

模型大小 GPU数量 推荐策略
<10B 1-2 TP=1 或 TP=2
10B-30B 2-4 TP=4
30B-70B 4-8 TP=4 或 TP=8
70B-175B 8-16 Hybrid (TP=4, PP=2)
>175B 16+ Hybrid (TP=8, PP=4)

4️⃣ 请求路由策略

支持多种路由算法,优化请求分发:

  • load_balanced: 负载均衡,路由到负载最低的实例
  • round_robin: 轮询
  • random: 随机选择
  • affinity: 用户亲和性,同一用户请求路由到同一实例(提高缓存命中率)
  • locality: 基于哈希的局部性路由,提高缓存命中率
manager = ControlPlaneManager(
    routing_strategy="affinity",  # 用户亲和性路由
)

5️⃣ 性能监控与指标

实时收集和分析性能指标:

# 获取性能指标
metrics = manager.get_metrics()

# 请求指标
print(f"Total requests: {metrics.total_requests}")
print(f"Completed: {metrics.completed_requests}")
print(f"Active: {metrics.active_requests}")

# 延迟指标
print(f"Avg latency: {metrics.avg_latency_ms}ms")
print(f"P95 latency: {metrics.p95_latency_ms}ms")
print(f"P99 latency: {metrics.p99_latency_ms}ms")

# 吞吐指标
print(f"Tokens/sec: {metrics.tokens_per_second}")
print(f"Requests/sec: {metrics.requests_per_second}")

# SLO指标
print(f"SLO violations: {metrics.slo_violations}")
print(f"SLO compliance: {metrics.slo_compliance_rate:.2%}")

# 资源指标
print(f"GPU utilization: {metrics.avg_gpu_utilization:.2%}")

🚀 快速开始

安装

# 克隆项目
git clone https://github.com/intellistream/sageLLM.git
cd sageLLM

# 安装依赖
pip install -r requirements.txt

# 开发模式安装
pip install -e .

基本使用

import asyncio
from control_plane import (
    ControlPlaneManager,
    ExecutionInstance,
    RequestMetadata,
    RequestPriority,
)


async def main():
    # 1. 创建控制平面管理器
    manager = ControlPlaneManager(
        scheduling_policy="adaptive",  # 自适应调度
        routing_strategy="load_balanced",  # 负载均衡
        enable_monitoring=True,
    )

    # 2. 注册 vLLM 实例
    instance = ExecutionInstance(
        instance_id="vllm-1",
        host="localhost",
        port=8000,
        model_name="meta-llama/Llama-2-7b-chat-hf",
        tensor_parallel_size=2,
        gpu_count=2,
        max_concurrent_requests=100,
    )
    manager.register_instance(instance)

    # 3. 启动控制平面
    await manager.start()

    # 4. 提交推理请求
    request = RequestMetadata(
        request_id="req-001",
        user_id="user-123",
        priority=RequestPriority.HIGH,
        slo_deadline_ms=1000,  # 1秒SLO
        max_tokens=512,
        prompt="Explain quantum computing in simple terms.",
    )

    request_id = await manager.submit_request(request)
    print(f"Request submitted: {request_id}")

    # 5. 等待并获取结果
    await asyncio.sleep(2)
    status = await manager.get_request_status(request_id)
    print(f"Request status: {status}")

    # 6. 获取性能指标
    metrics = manager.get_metrics()
    print(f"Throughput: {metrics.requests_per_second:.2f} req/s")
    print(f"Avg Latency: {metrics.avg_latency_ms:.2f} ms")

    # 7. 停止控制平面
    await manager.stop()


if __name__ == "__main__":
    asyncio.run(main())

高级使用示例

更详细的使用示例,请查看:

运行测试

# 运行所有 Control Plane 测试
cd tests/control_plane
python -m pytest -v

# 运行特定测试模块
python -m pytest test_scheduling.py -v      # 调度策略测试
python -m pytest test_pd_separation.py -v   # PD 分离测试
python -m pytest test_executor.py -v        # 执行器测试
python -m pytest test_integration.py -v     # 集成测试

# 生成覆盖率报告
python -m pytest --cov=control_plane tests/control_plane/

测试结果: ✅ 全部 20 个测试通过

  • ✅ 5 个调度测试 (test_scheduling.py)
  • ✅ 5 个 PD 分离测试 (test_pd_separation.py)
  • ✅ 5 个执行器测试 (test_executor.py)
  • ✅ 5 个集成测试 (test_integration.py)

async def main(): # 创建控制平面管理器 manager = ControlPlaneManager( scheduling_policy="adaptive", enable_pd_separation=True, )

# 注册 vLLM 实例
instance = ExecutionInstance(
    instance_id="llama-instance-1",
    model_name="meta-llama/Llama-2-7b-chat-hf",
    tensor_parallel_size=2,
    gpu_count=2,
)
manager.register_instance(instance)

# 处理请求
from vllm.sampling_params import SamplingParams

prompt = "Hello, how are you?"
sampling_params = SamplingParams(
    temperature=0.7,
    top_p=0.9,
    max_tokens=512,
)

output = await manager.process_request(
    prompt=prompt,
    sampling_params=sampling_params,
)

print(f"Output: {output}")

# 获取性能指标
metrics = manager.get_metrics()
print(f"吞吐: {metrics.throughput} req/s")
print(f"平均延迟: {metrics.avg_latency} ms")

if name == "main": asyncio.run(main())


更详细的使用示例,请查看 [`control_plane/examples/`](./control_plane/examples/) 目录。

### 运行测试

```bash
# 运行所有 Control Plane 测试
cd tests/control_plane
python -m pytest -v

# 运行特定测试
python -m pytest test_scheduling.py -v
python -m pytest test_pd_separation.py -v

# 生成覆盖率报告
python -m pytest --cov=control_plane tests/control_plane/

测试结果: ✅ 全部 17 个测试通过

  • ✅ 5 个调度测试 (test_scheduling.py)
  • ✅ 5 个 PD 分离测试 (test_pd_separation.py)
  • ✅ 5 个执行器测试 (test_executor.py)
  • ✅ 2 个集成测试 (test_integration.py)

🏗️ 架构设计

整体架构

┌─────────────────────────────────────────────────────────────────┐
│                       User Application                           │
│                  (SAGE Apps, Custom Services)                    │
└────────────────────────────┬────────────────────────────────────┘
                             │ submit_request()
┌────────────────────────────▼────────────────────────────────────┐
│                    Control Plane (sageLLM)                       │
├─────────────────────────────────────────────────────────────────┤
│                                                                   │
│  ┌──────────────────────────────────────────────────────────┐   │
│  │           Control Plane Manager (核心管理器)              │   │
│  │  • 请求队列管理 (pending_queue, running_requests)         │   │
│  │  • 调度循环 (scheduling_loop)                             │   │
│  │  • 健康检查 (health_check_loop)                           │   │
│  │  • 性能监控 (performance monitoring)                      │   │
│  └──────────────────────────────────────────────────────────┘   │
│           │                  │                  │                │
│           ▼                  ▼                  ▼                │
│  ┌──────────────┐  ┌──────────────┐  ┌──────────────┐          │
│  │ Scheduling   │  │ Parallelism  │  │ PD Router &  │          │
│  │ Policies     │  │ Optimizer    │  │ Routing      │          │
│  │              │  │              │  │              │          │
│  │ • FIFO       │  │ • Auto TP/PP │  │ • Adaptive   │          │
│  │ • Priority   │  │ • DP/EP      │  │ • Hash-based │          │
│  │ • SLO-Aware  │  │ • Hybrid     │  │ • LB/Affinity│          │
│  │ • Cost-Opt   │  │              │  │              │          │
│  │ • Adaptive   │  │              │  │              │          │
│  └──────────────┘  └──────────────┘  └──────────────┘          │
│           │                  │                  │                │
│           └──────────────────┴──────────────────┘                │
│                              │                                   │
│                              ▼                                   │
│                  ┌──────────────────────┐                        │
│                  │ Execution Coordinator │                        │
│                  │  • Instance Registry  │                        │
│                  │  • Health Monitoring  │                        │
│                  │  • Metrics Collection │                        │
│                  └──────────────────────┘                        │
└──────────────────────────────┼───────────────────────────────────┘
                               │ HTTP API calls
┌──────────────────────────────▼───────────────────────────────────┐
│                     Execution Layer (vLLM)                       │
├─────────────────────────────────────────────────────────────────┤
│  ┌──────────┐  ┌──────────┐  ┌──────────┐  ┌──────────┐        │
│  │ vLLM     │  │ vLLM     │  │ vLLM     │  │ vLLM     │        │
│  │ Instance │  │ Instance │  │ Instance │  │ Instance │        │
│  │    1     │  │    2     │  │    3     │  │    N     │        │
│  │          │  │          │  │          │  │          │        │
│  │ TP=4     │  │ TP=2,PP=2│  │ DP=2     │  │ Hybrid   │        │
│  │ Prefill  │  │ Decode   │  │ Decode   │  │ General  │        │
│  │ 优化吞吐  │  │ 优化延迟  │  │ 高并发   │  │ 混合负载 │        │
│  └──────────┘  └──────────┘  └──────────┘  └──────────┘        │
│                                                                   │
│  • GPU Memory: PagedAttention, KV Cache Management              │
│  • Kernels: CUDA, FlashAttention, FlashInfer                    │
│  • Quantization: GPTQ, AWQ, FP8, INT8                           │
└─────────────────────────────────────────────────────────────────┘

请求处理流程

1. User App submits RequestMetadata
          ↓
2. Control Plane Manager receives request
          ↓
3. Scheduling Policy determines priority/order
          ↓
4. PD Router (if enabled) determines request phase
   • Prefilling phase (long input)
   • Decoding phase (short input)
          ↓
5. Request Router selects appropriate instance
   • Load balancing
   • Affinity/Locality
   • Health check
          ↓
6. Execution Coordinator executes via HTTP API
   • POST /v1/completions or /v1/chat/completions
   • Stream or batch response
          ↓
7. vLLM Instance processes request
   • AsyncLLMEngine execution
   • KV cache management
   • GPU scheduling
          ↓
8. Response returns to Control Plane
          ↓
9. Metrics collected and updated
          ↓
10. Result returns to User App

核心组件说明

1. Control Plane Manager (manager.py)

  • 核心协调层,管理整个请求生命周期
  • 维护请求队列和运行状态
  • 协调各个子组件工作

2. Scheduling Strategies (strategies/)

  • 5种调度策略:FIFO、Priority、SLO-Aware、Cost-Optimized、Adaptive
  • 模块化设计,每个策略独立文件
  • 支持自定义策略开发(参见 docs/CUSTOM_SCHEDULING.md

3. PD Router (pd_routing.py)

  • Prefilling/Decoding 分离路由
  • 根据请求特征(输入长度、输出长度)判断阶段
  • 将请求路由到专门优化的实例

4. Request Router (router.py)

  • 请求路由和负载均衡
  • 支持多种路由策略:load_balanced、round_robin、affinity、locality
  • 考虑实例健康状态和当前负载

5. Parallelism Optimizer (parallelism.py)

  • 自动选择最优并行策略
  • 支持 TP、PP、DP、EP、Hybrid
  • 根据模型大小和 GPU 数量推荐配置

6. Execution Coordinator (executor.py)

  • 管理所有 vLLM 实例
  • 执行 HTTP API 调用
  • 健康检查和指标收集

7. Types (types.py)

  • 数据模型定义
  • 枚举类型
  • 配置类

📚 文档

⚙️ 环境设置

GPU 支持 (生产环境推荐)

# 安装 CUDA Toolkit (Ubuntu/Debian)
sudo apt update && sudo apt install -y nvidia-cuda-toolkit

# 验证 CUDA 安装
nvcc --version
nvidia-smi

# 安装 vLLM(会自动编译 CUDA 内核)
pip install vllm

# 重新安装 sageLLM (如需编译扩展)
pip install -e .

CPU 测试环境

# 测试可以在没有 GPU 的环境下运行(仅用于单元测试)
cd tests/control_plane
python -m pytest -v

# 注意:实际推理需要 GPU

🔗 依赖关系

核心依赖

  • vLLM (>= 0.3.0): LLM 推理引擎
  • PyTorch (>= 2.0.0): 深度学习框架
  • Python (>= 3.8): 编程语言

可选依赖

  • asyncio: 异步编程(Python 内置)
  • pydantic: 数据验证
  • pytest: 单元测试
  • pytest-cov: 测试覆盖率

详见 requirements.txtrequirements-dev.txt

🚢 部署

本地开发

# 启动单个 vLLM 实例
python -m vllm.entrypoints.openai.api_server \
    --model meta-llama/Llama-2-7b \
    --host 0.0.0.0 \
    --port 8000 \
    --tensor-parallel-size 1

# 启动 Control Plane
python -m control_plane.example

生产环境

参考 部署指南 了解:

  • 多实例部署
  • PD 分离配置
  • 负载均衡设置
  • 监控和日志

🎓 使用场景

场景 1: 高吞吐批处理

# 使用 DP (数据并行) 提高吞吐
manager = ControlPlaneManager(scheduling_policy="fifo")
instance = ExecutionInstance(
    instance_id="batch-instance",
    data_parallel_size=4,
    gpu_count=4,
)

场景 2: 低延迟在线服务

# 使用 SLO-Aware 调度 + PD 分离
manager = ControlPlaneManager(
    scheduling_policy="slo_aware",
    enable_pd_separation=True,
)
# 注册 decoding 优化实例(低延迟)

场景 3: 混合优先级

# 使用 Priority 调度
manager = ControlPlaneManager(scheduling_policy="priority")

# 高优先级请求
high_priority_request = RequestMetadata(
    priority=RequestPriority.CRITICAL,
    slo_deadline_ms=500,
)

# 低优先级请求
low_priority_request = RequestMetadata(
    priority=RequestPriority.LOW,
)

场景 4: 成本优化

# 使用 Cost-Optimized 调度
manager = ControlPlaneManager(scheduling_policy="cost_optimized")

# 设置成本预算
request = RequestMetadata(
    cost_budget=0.01,  # 最多花费 $0.01
)

📄 许可

本项目采用 Apache 2.0 许可证,详见 LICENSE

🤝 贡献

欢迎贡献!请查看 CONTRIBUTING.md 了解如何参与开发。

快速开始贡献

# Fork 和 Clone
git clone https://github.com/yourusername/SAGE.git
cd packages/sage-common/src/sage/common/components/sage_llm/sageLLM

# 创建特性分支
git checkout -b feature/your-feature

# 修改代码并提交
git add .
git commit -m "feat: your feature description"

# Push 并创建 PR
git push origin feature/your-feature

📮 联系方式

  • 📧 邮件:请通过 GitHub Issues 联系
  • 💬 讨论:使用 GitHub Discussions
  • 🐛 Bug 报告:GitHub Issues

🙏 致谢

  • 感谢 vLLM 项目 提供优秀的 LLM 推理引擎
  • 感谢 SAGE 项目团队的支持和指导

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Contributors 7