- 🚀 流式推送核心:基于 SSE (Server-Sent Events) 实现实时流式输出
- 🔌 高扩展性:通过适配器模式轻松接入多种 AI 提供商
- 🏗️ 生产级实现:参考 OpenAI、DeepSeek 等主流 AI 服务的成熟方案
- 📦 开箱即用:提供完整示例,5 分钟快速上手
- 🎯 类型安全:完整的类型定义和接口规范
- ⚡ 高性能:基于 Go channel 的异步处理机制
git clone https://github.com/ONEMYX/aiChatFlow.git
cd aiChatFlow
go mod download方式一:模拟示例(无需 API Key)
go run examples/simple/main.go
# 访问 http://localhost:8080方式二:OpenAI 真实测试
# 1. 配置 API Key
export OPENAI_API_KEY="your-api-key"
# 2. 运行测试
go run examples/openai_test/main.go
# 3. 访问 http://localhost:8080
# 点击"开始流式对话"体验真实的 AI 流式输出aiChatFlow/
├── stream/ # 核心流式输出模块
│ └── sse.go # SSE 流式推送实现(核心)
├── adapter/ # AI 提供商适配器接口
│ └── provider.go # 统一的提供商接口定义
├── provider/ # 具体提供商实现
│ └── openai/ # OpenAI 提供商
│ └── openai.go
├── examples/ # 使用示例
│ ├── simple/ # 模拟示例
│ └── openai_test/ # OpenAI 真实测试
├── ARCHITECTURE.md # 架构设计文档
└── QUICKSTART.md # 快速入门指南
// 创建 SSE 写入器
writer, _ := stream.NewSSEWriter(w, r)
// 写入数据并立即刷新
writer.WriteData(data)关键点:
- 设置正确的 HTTP 响应头
- 立即刷新缓冲区(
Flusher.Flush()) - 监听客户端断开(
context.Done())
所有 AI 提供商实现统一接口:
type AIProvider interface {
ChatStream(ctx context.Context, req ChatRequest, output chan<- string) error
Name() string
}优势:
- 统一抽象,易于扩展
- 通过 channel 传递流式内容
- 工厂模式管理多个提供商
import (
"github.com/ONEMYX/aiChatFlow/stream"
"github.com/ONEMYX/aiChatFlow/provider/openai"
)
// 1. 创建流式处理器
handler, _ := stream.NewStreamHandler(w, r)
// 2. 执行流式处理
handler.Stream(ctx, func(ctx context.Context, output chan<- string) error {
// 3. 逐步发送内容
output <- "Hello"
output <- " "
output <- "World"
return nil
})// 创建 Provider
provider, _ := openai.NewProvider(openai.Config{
APIKey: "your-api-key",
Model: "gpt-3.5-turbo",
})
// 流式对话
handler.Stream(ctx, func(ctx context.Context, output chan<- string) error {
return provider.ChatStream(ctx, adapter.ChatRequest{
Messages: []adapter.Message{
{Role: "user", Content: "你好"},
},
}, output)
})只需实现 AIProvider 接口:
type MyAIProvider struct {
apiKey string
}
func (p *MyAIProvider) Name() string {
return "myai"
}
func (p *MyAIProvider) ChatStream(ctx context.Context, req ChatRequest, output chan<- string) error {
// 1. 调用第三方 API
// 2. 读取流式响应
// 3. 解析并发送到 output channel
for {
chunk := readFromAPI()
select {
case <-ctx.Done():
return ctx.Err()
case output <- chunk:
}
}
}
// 注册到工厂
factory.Register("myai", &MyAIProvider{apiKey: "xxx"})Client (浏览器)
↓ EventSource
StreamHandler
↓ SSE Protocol
↓ Channel
AI Provider
↓ HTTP Stream
OpenAI/DeepSeek API
-
SSE vs WebSocket
- SSE:单向通信,适合服务器推送
- 浏览器原生支持,实现简单
- HTTP/1.1 兼容性好
-
缓冲区刷新
fmt.Fprintf(w, "data: %s\n\n", content) flusher.Flush() // 立即发送
-
Channel 通信
- 解耦数据生成和发送
- 天然的背压机制
- 支持异步处理
-
添加中间件
- 认证/鉴权 - 限流控制 - 日志记录 - 错误恢复
-
性能优化
- 连接池管理 - 请求去重 - 响应缓存
-
监控指标
- 活跃连接数 - 响应延迟 - 错误率 - 吞吐量
handler.Stream(ctx, func(ctx context.Context, output chan<- string) error {
if err := someOperation(); err != nil {
return fmt.Errorf("operation failed: %w", err)
}
return nil
})ctx, cancel := context.WithTimeout(r.Context(), 60*time.Second)
defer cancel()
handler.Stream(ctx, generator)- OpenAI API: 流式响应文档
- DeepSeek API: OpenAI 兼容接口
- SSE 规范: MDN - Server-Sent Events
欢迎提交 Issue 和 Pull Request!
如果这个项目对你有帮助,请给一个 Star ⭐
Made with ❤️ by AI Chat Flow Contributors