共计 3501 个字符,预计需要花费 9 分钟才能阅读完成。
背景痛点
在实际开发中,直接调用 ChatGPT API 会遇到几个典型问题:

- 认证泄露风险 :API 密钥硬编码在代码中或提交到版本库,容易造成安全漏洞
- 响应解析复杂度 :处理流式响应时,需要手动拼接分块数据并处理中间状态
- 速率限制问题 :突发大量请求容易触发 API 的速率限制,需要完善的退避机制
技术对比
| 对比维度 | requests 库 | openai 官方库 |
|---|---|---|
| 可定制性 | 高,可完全控制 HTTP 细节 | 中,部分参数被封装 |
| 依赖项 | 仅需 requests | 需要安装 openai 包及更多依赖 |
| 错误处理 | 需手动处理 HTTP 状态码 | 封装了 API 特定错误类型 |
| 流式响应支持 | 需手动实现分块读取 | 内置生成器支持 |
| 开发效率 | 较低,需自行封装 | 较高,开箱即用 |
核心实现
基础请求模板
import requests
from typing import Optional, Dict, Any
def chat_completion(
api_key: str,
prompt: str,
model: str = "gpt-3.5-turbo",
temperature: float = 0.7,
) -> Dict[str, Any]:
"""
基础请求封装
:param api_key: OpenAI API 密钥
:param prompt: 用户输入的提示文本
:param model: 使用的模型名称
:param temperature: 生成文本的随机性控制
:return: API 响应 JSON
"""headers = {"Authorization": f"Bearer {api_key}","Content-Type":"application/json",
}
payload = {
"model": model,
"messages": [{"role": "user", "content": prompt}],
"temperature": temperature,
}
response = requests.post(
"https://api.openai.com/v1/chat/completions",
headers=headers,
json=payload,
timeout=30,
)
response.raise_for_status()
return response.json()
流式响应处理
from typing import Iterator
def stream_chat_completion(api_key: str, prompt: str, **kwargs) -> Iterator[str]:
"""
流式响应生成器实现
:yields: 每个分块的文本内容
"""headers = {"Authorization": f"Bearer {api_key}","Content-Type":"application/json",
}
payload = {
"model": "gpt-3.5-turbo",
"messages": [{"role": "user", "content": prompt}],
"stream": True,
**kwargs,
}
try:
with requests.post(
"https://api.openai.com/v1/chat/completions",
headers=headers,
json=payload,
stream=True,
timeout=60,
) as response:
response.raise_for_status()
for chunk in response.iter_lines():
if chunk:
decoded = chunk.decode("utf-8")
if decoded.startswith("data:"):
yield decoded[5:].strip()
except requests.exceptions.RequestException as e:
print(f"Stream request failed: {e}")
raise
自动重试装饰器
import time
from functools import wraps
from typing import Callable, TypeVar, Any
T = TypeVar("T")
def retry_with_backoff(
max_retries: int = 3,
initial_delay: float = 1.0,
max_delay: float = 10.0,
):
"""
指数退避重试装饰器
:param max_retries: 最大重试次数
:param initial_delay: 初始延迟 (秒)
:param max_delay: 最大延迟 (秒)
"""
def decorator(func: Callable[..., T]) -> Callable[..., T]:
@wraps(func)
def wrapper(*args, **kwargs) -> T:
delay = initial_delay
last_exception = None
for attempt in range(max_retries + 1):
try:
return func(*args, **kwargs)
except requests.exceptions.RequestException as e:
last_exception = e
if attempt == max_retries:
break
time.sleep(min(delay, max_delay))
delay *= 2 # 指数退避
raise last_exception or RuntimeError("Unknown error occurred")
return wrapper
return decorator
生产建议
请求池化实现
import requests.adapters
class APIClient:
def __init__(self, api_key: str, pool_connections: int = 10):
self.session = requests.Session()
adapter = requests.adapters.HTTPAdapter(
pool_connections=pool_connections,
pool_maxsize=pool_connections,
)
self.session.mount("https://", adapter)
self.api_key = api_key
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.session.close()
@retry_with_backoff()
def chat_completion(self, prompt: str, **kwargs):
headers = {"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json",
}
# 其余实现同前
密钥安全上下文
from contextlib import contextmanager
import os
@contextmanager
def api_key_context(env_var: str = "OPENAI_API_KEY"):
"""
安全管理 API 密钥的上下文管理器
:param env_var: 存储密钥的环境变量名
"""
api_key = os.getenv(env_var)
if not api_key:
raise ValueError(f"Environment variable {env_var} not set")
try:
yield api_key
finally:
# 可添加密钥使用后的清理逻辑
pass
Token 消耗监控
def token_monitor(func):
"""监控 API 调用 token 消耗的装饰器"""
@wraps(func)
def wrapper(*args, **kwargs):
result = func(*args, **kwargs)
if isinstance(result, dict) and "usage" in result:
print(f"Token usage: {result['usage']}")
return result
return wrapper
延伸思考
本文已经展示了同步调用的最佳实践,但在实际生产环境中,异步批处理可能更为高效。例如:
- 如何设计基于 asyncio 的异步请求管道?
- 当需要处理大量独立请求时,怎样实现优先级队列?
- 对于超长对话场景,如何智能分割上下文以避免 token 超限?
这些问题的解决方案将是我们下一篇文章的重点,读者可以先尝试基于 aiohttp 实现基础版本。
正文完
