共计 2590 个字符,预计需要花费 7 分钟才能阅读完成。
背景分析:多模型集成的那些头疼事
在实际开发中接入多个大模型 API 时,就像要同时跟讲不同方言的技术团队合作。以下是几个最常见的痛点:

- 协议不统一:有的用 RESTful JSON,有的走 gRPC,还有的直接返回原始文本流
- 认证方式五花八门:API Key 可能在 Header、URL 参数甚至请求体里
- 响应结构各异 :成功时有的返回
data字段,有的直接给result,错误时有的用 HTTP 状态码,有的在 body 里藏错误码 - 性能差异大:不同模型的响应时间可能从几百毫秒到十几秒不等
架构设计:用 Claude 当 ” 翻译官 ”
我们的解决方案是构建一个代理网关层,主要包含这些核心模块:
- 流量路由:根据请求参数决定转发到哪个模型服务
- 协议转换:将外部统一接口转为各模型需要的格式
- 负载均衡:当有多个同类型模型实例时做智能分发
- 熔断保护:防止某个模型服务拖垮整个系统
架构示意图:
客户端 → 统一 API 网关 → [协议转换] → [路由决策] → 各模型 API
↑ ↓
[监控告警] ← [性能统计]
代码实现:Python 异步网关核心
以下是使用 FastAPI + aiohttp 的异步实现示例(已省略部分样板代码):
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import aiohttp
from prometheus_client import Counter, make_asgi_app
# 监控指标
REQUEST_COUNT = Counter('api_requests', 'Total API requests', ['model_type'])
app = FastAPI()
metrics_app = make_asgi_app()
app.mount("/metrics", metrics_app)
class ModelRequest(BaseModel):
text: str
model_type: str # claude/gpt/llama etc.
async def call_model_api(url: str, payload: dict, headers: dict) -> dict:
async with aiohttp.ClientSession() as session:
try:
async with session.post(
url,
json=payload,
headers=headers,
timeout=10
) as resp:
if resp.status != 200:
return {"error": await resp.text()}
return await resp.json()
except asyncio.TimeoutError:
return {"error": "Request timeout"}
@app.post("/v1/complete")
async def unified_api(request: ModelRequest):
# 路由决策
if request.model_type == "claude":
endpoint = "https://api.anthropic.com/v1/complete"
headers = {"Authorization": f"Bearer {CLAUDE_KEY}"}
payload = {"prompt": request.text}
elif request.model_type == "gpt":
endpoint = "https://api.openai.com/v1/completions"
headers = {"Authorization": f"Bearer {GPT_KEY}"}
payload = {"model": "text-davinci-003", "prompt": request.text}
else:
raise HTTPException(status_code=400, detail="Unsupported model")
REQUEST_COUNT.labels(model_type=request.model_type).inc()
# 带重试机制的调用
max_retries = 3
for attempt in range(max_retries):
result = await call_model_api(endpoint, payload, headers)
if "error" not in result:
break
await asyncio.sleep(1 * (attempt + 1))
# 统一响应格式
return {"data": result.get("completion") or result.get("choices")[0]["text"],
"model_used": request.model_type
}
性能优化实战技巧
- 批处理请求:当需要处理大量相似请求时
async def batch_process(texts: List[str], model: str):
semaphore = asyncio.Semaphore(100) # 控制并发量
async with aiohttp.ClientSession() as session:
tasks = [process_one(session, text, model, semaphore) for text in texts]
return await asyncio.gather(*tasks)
- 缓存策略:对相同 prompt 进行缓存
from aiocache import cached
@cached(ttl=3600)
async def get_cached_response(prompt: str, model: str) -> dict:
# 实际 API 调用逻辑
生产环境避坑指南
- 认证失效:各平台的 API Key 有效期不同,建议实现自动刷新机制
- 超时设置:根据模型特性设置不同超时(文案生成 5 秒,代码生成可放宽到 30 秒)
- 限流处理:在网关层实现令牌桶算法,避免触发上游限制
- 错误兜底:当主模型不可用时自动切换备用模型
延伸思考
- 如何在不重启服务的情况下动态添加新模型支持?
- 当需要同时调用多个模型做结果对比时,架构该如何调整?
- 怎样设计 AB 测试框架来评估不同模型的业务效果?
在实际项目中,我们通过这套方案将不同模型的响应时间差异对业务的影响降低了 70%,同时运维复杂度显著下降。建议先从核心路由功能开始实现,再逐步添加高级特性。
正文完
