Python Requests调用ChatGPT API实战指南:从基础到生产环境优化

1次阅读
没有评论

共计 3501 个字符,预计需要花费 9 分钟才能阅读完成。

image.webp

背景痛点

在实际开发中,直接调用 ChatGPT API 会遇到几个典型问题:

Python Requests 调用 ChatGPT API 实战指南:从基础到生产环境优化

  1. 认证泄露风险 :API 密钥硬编码在代码中或提交到版本库,容易造成安全漏洞
  2. 响应解析复杂度 :处理流式响应时,需要手动拼接分块数据并处理中间状态
  3. 速率限制问题 :突发大量请求容易触发 API 的速率限制,需要完善的退避机制

技术对比

对比维度 requests 库 openai 官方库
可定制性 高,可完全控制 HTTP 细节 中,部分参数被封装
依赖项 仅需 requests 需要安装 openai 包及更多依赖
错误处理 需手动处理 HTTP 状态码 封装了 API 特定错误类型
流式响应支持 需手动实现分块读取 内置生成器支持
开发效率 较低,需自行封装 较高,开箱即用

核心实现

基础请求模板

import requests
from typing import Optional, Dict, Any


def chat_completion(
    api_key: str,
    prompt: str,
    model: str = "gpt-3.5-turbo",
    temperature: float = 0.7,
) -> Dict[str, Any]:
    """
    基础请求封装
    :param api_key: OpenAI API 密钥
    :param prompt: 用户输入的提示文本
    :param model: 使用的模型名称
    :param temperature: 生成文本的随机性控制
    :return: API 响应 JSON
    """headers = {"Authorization": f"Bearer {api_key}","Content-Type":"application/json",
    }
    payload = {
        "model": model,
        "messages": [{"role": "user", "content": prompt}],
        "temperature": temperature,
    }

    response = requests.post(
        "https://api.openai.com/v1/chat/completions",
        headers=headers,
        json=payload,
        timeout=30,
    )
    response.raise_for_status()
    return response.json()

流式响应处理

from typing import Iterator


def stream_chat_completion(api_key: str, prompt: str, **kwargs) -> Iterator[str]:
    """
    流式响应生成器实现
    :yields: 每个分块的文本内容
    """headers = {"Authorization": f"Bearer {api_key}","Content-Type":"application/json",
    }
    payload = {
        "model": "gpt-3.5-turbo",
        "messages": [{"role": "user", "content": prompt}],
        "stream": True,
        **kwargs,
    }

    try:
        with requests.post(
            "https://api.openai.com/v1/chat/completions",
            headers=headers,
            json=payload,
            stream=True,
            timeout=60,
        ) as response:
            response.raise_for_status()

            for chunk in response.iter_lines():
                if chunk:
                    decoded = chunk.decode("utf-8")
                    if decoded.startswith("data:"):
                        yield decoded[5:].strip()
    except requests.exceptions.RequestException as e:
        print(f"Stream request failed: {e}")
        raise

自动重试装饰器

import time
from functools import wraps
from typing import Callable, TypeVar, Any

T = TypeVar("T")


def retry_with_backoff(
    max_retries: int = 3,
    initial_delay: float = 1.0,
    max_delay: float = 10.0,
):
    """
    指数退避重试装饰器
    :param max_retries: 最大重试次数
    :param initial_delay: 初始延迟 (秒)
    :param max_delay: 最大延迟 (秒)
    """
    def decorator(func: Callable[..., T]) -> Callable[..., T]:
        @wraps(func)
        def wrapper(*args, **kwargs) -> T:
            delay = initial_delay
            last_exception = None

            for attempt in range(max_retries + 1):
                try:
                    return func(*args, **kwargs)
                except requests.exceptions.RequestException as e:
                    last_exception = e
                    if attempt == max_retries:
                        break

                    time.sleep(min(delay, max_delay))
                    delay *= 2  # 指数退避

            raise last_exception or RuntimeError("Unknown error occurred")
        return wrapper
    return decorator

生产建议

请求池化实现

import requests.adapters

class APIClient:
    def __init__(self, api_key: str, pool_connections: int = 10):
        self.session = requests.Session()
        adapter = requests.adapters.HTTPAdapter(
            pool_connections=pool_connections,
            pool_maxsize=pool_connections,
        )
        self.session.mount("https://", adapter)
        self.api_key = api_key

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.session.close()

    @retry_with_backoff()
    def chat_completion(self, prompt: str, **kwargs):
        headers = {"Authorization": f"Bearer {self.api_key}",
            "Content-Type": "application/json",
        }
        # 其余实现同前 

密钥安全上下文

from contextlib import contextmanager
import os

@contextmanager
def api_key_context(env_var: str = "OPENAI_API_KEY"):
    """
    安全管理 API 密钥的上下文管理器
    :param env_var: 存储密钥的环境变量名
    """
    api_key = os.getenv(env_var)
    if not api_key:
        raise ValueError(f"Environment variable {env_var} not set")

    try:
        yield api_key
    finally:
        # 可添加密钥使用后的清理逻辑
        pass

Token 消耗监控

def token_monitor(func):
    """监控 API 调用 token 消耗的装饰器"""
    @wraps(func)
    def wrapper(*args, **kwargs):
        result = func(*args, **kwargs)

        if isinstance(result, dict) and "usage" in result:
            print(f"Token usage: {result['usage']}")

        return result
    return wrapper

延伸思考

本文已经展示了同步调用的最佳实践,但在实际生产环境中,异步批处理可能更为高效。例如:

  1. 如何设计基于 asyncio 的异步请求管道?
  2. 当需要处理大量独立请求时,怎样实现优先级队列?
  3. 对于超长对话场景,如何智能分割上下文以避免 token 超限?

这些问题的解决方案将是我们下一篇文章的重点,读者可以先尝试基于 aiohttp 实现基础版本。

正文完
 0
评论(没有评论)