共计 2201 个字符,预计需要花费 6 分钟才能阅读完成。
背景痛点
将 ChatGPT 集成到搜索引擎时,开发者通常会面临以下几个核心挑战:
- 长尾查询的响应延迟 :用户输入的查询多样化,ChatGPT 对复杂或模糊查询的处理时间波动较大,直接影响搜索体验。
- 多轮对话的会话状态维护 :需要高效管理用户上下文,避免对话断裂或信息丢失,这对分布式系统设计提出高要求。
- OpenAI API 的速率限制规避 :免费版和付费版 API 均有严格 QPS 限制,直接高频调用易触发 429 错误(Too Many Requests)。
技术方案
架构设计
系统采用分层架构,核心模块包括:
- 负载均衡层 :使用 Nginx 分发请求,按用户哈希分配会话 ID,确保同一用户请求路由到相同后端实例。
- 请求队列层 :RabbitMQ 缓冲突发流量,配合优先级队列处理 VIP 用户请求。
- 处理引擎层 :
- 查询预处理:清洗输入文本并识别意图
- 动态批处理:将 5ms 内相似请求合并为单个 API 调用
- 结果缓存:Redis 存储高频查询结果,TTL 动态调整
- 降级容灾层 :API 不可用时自动切换至基于 ES 的关键词搜索

核心组件
查询预处理模块
- 敏感词过滤 :使用 DFA 算法实现毫秒级匹配
- 意图识别 :集成 BERT 分类器区分 ” 问答类 ” 和 ” 检索类 ” 查询
动态批处理引擎
- 时间窗口:合并 200ms 内的同类别请求
- 相似度计算:通过 Sentence-BERT 编码 + 余弦相似度聚类
指数退避重试
async def call_api_with_retry(prompt, max_retries=3):
base_delay = 1.0
for attempt in range(max_retries):
try:
return await openai_chat(prompt)
except RateLimitError:
delay = base_delay * (2 ** attempt) + random.uniform(0, 0.1)
await asyncio.sleep(delay)
raise ServiceUnavailableError()
代码实现
异步流式响应
import aiohttp
from redis import asyncio as aioredis
class ChatGPTStreamer:
def __init__(self):
self.redis = aioredis.from_url("redis://localhost")
async def stream_response(self, session_id, query):
# 检查缓存
cached = await self.redis.get(f"cache:{query}")
if cached:
yield cached.decode()
return
# 流式调用 API
async with aiohttp.ClientSession() as session:
async with session.post(
"https://api.openai.com/v1/chat/completions",
headers={"Authorization": f"Bearer {self._get_token()}"},
json={"model": "gpt-4", "messages": self._build_context(session_id, query)},
timeout=30
) as resp:
async for chunk in resp.content:
yield chunk
await self.redis.setex(f"cache:{query}", 3600, chunk)
滑动窗口限流
type RateLimiter struct {
windowSize time.Duration
maxRequests int
requests map[string][]time.Time
mutex sync.Mutex
}
func (r *RateLimiter) Allow(userIP string) bool {r.mutex.Lock()
defer r.mutex.Unlock()
now := time.Now()
timestamps := r.requests[userIP]
// 移除过期请求
for len(timestamps) > 0 && now.Sub(timestamps[0]) > r.windowSize {timestamps = timestamps[1:]
}
if len(timestamps) >= r.maxRequests {return false}
r.requests[userIP] = append(timestamps, now)
return true
}
生产考量
性能指标
| 批处理大小 | 平均延迟 (ms) | 吞吐量 (QPS) |
|---|---|---|
| 1 | 1200 | 8 |
| 5 | 1500 | 35 |
| 10 | 1800 | 60 |
容灾方案
- 分级降级策略 (Fallback Mechanism):
- 优先尝试备用 API 端点
- 返回最近 24 小时的缓存结果
- 触发传统关键词搜索
避坑指南
- 429 响应处理 :必须实现断路器模式 (Circuit Breaker),当错误率超过阈值时暂停请求
- 会话安全 :使用 AES-GCM 加密会话 ID,避免在日志中记录完整对话历史
- 缓存预热 :启动时加载高频查询的预生成结果,防止缓存穿透
延伸思考
结合 RAG 技术
- 使用向量数据库存储知识库,通过相似度检索补充上下文
- 对 ChatGPT 输出进行事实性校验
百万级 QPS 架构
- 引入边缘计算:在全球部署多个推理节点
- 模型蒸馏:训练轻量级专用模型处理 80% 常见查询
- 请求预测:基于用户行为提前生成可能需要的回答
正文完
