本文目录导读:
我来详细解释AI流式处理的逻辑和实现方案。
核心原理
流式处理的核心是Token逐个生成和传输,而非等待完整响应。
基本工作流程
用户请求 → 模型逐token生成 → 实时推送到前端
实现代码示例
后端实现 (Python + FastAPI)
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import asyncio
app = FastAPI()
async def generate_stream(prompt: str):
# 模拟AI逐步生成文本
words = ["这是", "一段", "流式", "生成的", "文本"]
for word in words:
yield f"data: {word}\n\n"
await asyncio.sleep(0.1) # 模拟生成延迟
@app.post("/chat/stream")
async def chat_stream(prompt: str):
return StreamingResponse(
generate_stream(prompt),
media_type="text/event-stream"
)
前端实现 (JavaScript/TypeScript)
// 使用Fetch API处理流式响应
async function handleStreamResponse(prompt) {
const response = await fetch('/chat/stream', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify({ prompt })
});
const reader = response.body.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) break;
const chunk = decoder.decode(value);
// 更新UI显示
updateUI(chunk);
}
}
使用 Server-Sent Events (SSE)
// 更简洁的SSE方式
const eventSource = new EventSource('/chat/stream');
eventSource.onmessage = (event) => {
const data = JSON.parse(event.data);
// 逐token更新显示
appendToOutput(data.token);
};
eventSource.onerror = (error) => {
console.error('SSE Error:', error);
eventSource.close();
};
主流AI框架实现
OpenAI API 流式处理
import openai
# OpenAI 流式请求
response = openai.ChatCompletion.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "你好"}],
stream=True # 关键参数
)
for chunk in response:
if 'choices' in chunk:
delta = chunk['choices'][0].get('delta', {})
token = delta.get('content', '')
print(token, end='') # 逐个打印
LangChain 实现
from langchain.llms import OpenAI
from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler
# 流式输出
llm = OpenAI(
streaming=True, # 启用流式
callbacks=[StreamingStdOutCallbackHandler()],
temperature=0
)
llm("讲一个关于机器人的故事")
流式处理关键技术点
事件格式
data: {"token": "你好", "done": false}\n\n
data: {"token": "世界", "done": false}\n\n
data: {"token": "", "done": true}\n\n
缓冲区管理
class StreamBuffer {
constructor() {
this.buffer = '';
this.delimiters = ['\n', ' ', ''];
}
processChunk(chunk) {
this.buffer += chunk;
// 按完整token分割
return this.splitTokens();
}
}
错误处理
# 断线重连逻辑
max_retries = 3
retry_delay = 1
for attempt in range(max_retries):
try:
# 流式请求
break
except ConnectionError:
await asyncio.sleep(retry_delay * attempt)
性能优化策略
Token级控制
- 调整生成速度 (temperature参数)
- 设置最大token限制
网络优化
# 设置合适的buffer大小
class StreamSettings:
CHUNK_SIZE = 1024 # 1KB chunks
BUFFER_SIZE = 4096 # 4KB buffer
前端渲染优化
// 使用requestAnimationFrame优化渲染
let pendingTokens = [];
function processFrame() {
if (pendingTokens.length > 0) {
const token = pendingTokens.shift();
renderToken(token);
}
requestAnimationFrame(processFrame);
}
实际应用场景
- 聊天机器人 - 实时显示回答
- 代码补全 - 逐字符完成
- 翻译服务 - 逐词翻译显示
- 文档生成 - 逐步生成内容
完整项目结构示例
stream-ai-app/
├── backend/
│ ├── main.py # FastAPI应用
│ ├── stream_handler.py # 流式处理逻辑
│ └── models/
├── frontend/
│ ├── index.html
│ ├── stream.js # 客户端流处理
│ └── styles.css
└── README.md
这种处理方式能让用户体验到AI"实时思考"和"逐字输出"的效果,显著提升交互体验。
标签: 源码AI流式处理逻辑