如何用Python构建免费ChatGPT对话导出工具:从API调用到数据持久化

1次阅读
没有评论

共计 2886 个字符,预计需要花费 8 分钟才能阅读完成。

image.webp

需求场景分析与现有方案不足

在日常开发中,我们经常需要将 ChatGPT 的对话记录导出进行分析或存档。典型场景包括:

如何用 Python 构建免费 ChatGPT 对话导出工具:从 API 调用到数据持久化

  • 知识管理:将有价值的对话整理成可检索的知识库
  • 内容分析:统计高频话题或用户提问模式
  • 团队协作:分享技术讨论记录

然而,目前官方并未提供便捷的导出功能,网页端仅支持手动复制粘贴。现有解决方案主要有:

  1. 浏览器控制台爬取:不稳定且违反服务条款
  2. 第三方插件:存在数据安全风险
  3. 截图保存:无法进行后续处理

技术方案对比

我们评估了三种技术路线:

  • 网页爬取:
  • 优点:无需 API 权限
  • 缺点:易被反爬,维护成本高

  • 官方 API:

  • 优点:稳定合规
  • 缺点:需要开发成本

  • 第三方工具:

  • 优点:开箱即用
  • 缺点:功能受限,隐私风险

综合比较,我们选择基于官方 API 开发自研工具,既保证合规性又能灵活定制。

核心实现

1. 异步 API 调用

使用 aiohttp 实现高效并发请求:

import aiohttp
from typing import AsyncIterator

class ChatGPTExporter:
    def __init__(self, api_key: str):
        self.api_key = api_key
        self.session = aiohttp.ClientSession()

    async def fetch_conversations(self) -> AsyncIterator[dict]:
        headers = {"Authorization": f"Bearer {self.api_key}"}
        async with self.session.get(
            "https://api.openai.com/v1/conversations",
            headers=headers
        ) as response:
            response.raise_for_status()
            data = await response.json()
            for conv in data["items"]:
                yield conv

2. 数据清洗管道

关键清洗步骤:

  1. 去除系统提示消息
  2. 合并连续用户消息
  3. 识别会话边界
  4. 提取元数据(时间戳、模型版本等)
def clean_message(raw: dict) -> dict:
    """标准化单条消息格式"""
    return {"id": raw["id"],
        "role": raw["role"],
        "content": raw["content"].strip(),
        "timestamp": parse_time(raw["created_at"])
    }

3. 多格式导出器

以 Markdown 表格为例的生成算法:

def to_markdown(conversations: list[dict]) -> str:
    """生成带格式的 Markdown 表格"""
    table = ["| 时间 | 角色 | 内容 |\n|------|------|------|"]
    for msg in conversations:
        row = f"| {msg['timestamp']} | {msg['role']} | {escape(msg['content'])} |"
        table.append(row)
    return "\n".join(table)

生产级优化

速率限制规避

实现指数退避重试机制:

async def fetch_with_retry(url: str, max_retries: int = 3) -> dict:
    retry_delay = 1
    for attempt in range(max_retries):
        try:
            return await self.session.get(url)
        except RateLimitError:
            await asyncio.sleep(retry_delay * (2 ** attempt))
    raise APIError("Max retries exceeded")

敏感信息过滤

使用正则表达式实现关键词过滤:

SENSITIVE_PATTERNS = [r"\b(api[_-]?key|token)\b",
    r"\d{4}[-]?\d{4}[-]?\d{4}[-]?\d{4}"  # 信用卡号
]

def sanitize(text: str) -> str:
    for pattern in SENSITIVE_PATTERNS:
        text = re.sub(pattern, "[REDACTED]", text, flags=re.IGNORECASE)
    return text

本地缓存设计

采用 SQLite 实现增量导出:

class ConversationCache:
    def __init__(self, db_path: str):
        self.conn = sqlite3.connect(db_path)
        self._create_tables()

    def _create_tables(self):
        """创建消息存储表"""
        self.conn.execute("""
        CREATE TABLE IF NOT EXISTS messages (
            id TEXT PRIMARY KEY,
            conversation_id TEXT,
            content TEXT,
            timestamp INTEGER
        )""")

常见问题解决方案

会话 ID 重复

实现基于哈希值的去重逻辑:

def get_conversation_hash(msgs: list[dict]) -> str:
    """根据消息内容和时间生成唯一哈希"""
    combined = "".join(f"{m['content']}{m['timestamp']}"
        for m in sorted(msgs, key=lambda x: x["timestamp"])
    )
    return hashlib.md5(combined.encode()).hexdigest()

内存优化

对于大规模导出,采用分块处理:

  1. 按会话 ID 分组
  2. 每处理 100 个会话保存一次中间结果
  3. 使用生成器避免全量加载

时区处理

统一转换为 UTC 时间:

from datetime import datetime, timezone

def parse_time(timestamp: str) -> str:
    dt = datetime.fromisoformat(timestamp.rstrip("Z"))
    return dt.astimezone(timezone.utc).strftime("%Y-%m-%d %H:%M:%S")

扩展思考

本方案可扩展支持其他 AI 服务,关键适配点包括:

  1. API 客户端实现
  2. 消息格式转换器
  3. 服务特定参数配置

建议通过抽象基类定义统一接口:

from abc import ABC, abstractmethod

class AIConversationExporter(ABC):
    @abstractmethod
    async def fetch_messages(self) -> AsyncIterator[dict]:
        pass

    @abstractmethod
    def format_message(self, raw: dict) -> dict:
        pass

完整项目代码已开源在 GitHub,包含详细的使用文档和单元测试,欢迎贡献改进。这种基于官方 API 的自研方案既规避了法律风险,又能根据实际需求灵活调整,是团队知识管理的理想选择。

正文完
 0
评论(没有评论)