共计 2886 个字符,预计需要花费 8 分钟才能阅读完成。
需求场景分析与现有方案不足
在日常开发中,我们经常需要将 ChatGPT 的对话记录导出进行分析或存档。典型场景包括:

- 知识管理:将有价值的对话整理成可检索的知识库
- 内容分析:统计高频话题或用户提问模式
- 团队协作:分享技术讨论记录
然而,目前官方并未提供便捷的导出功能,网页端仅支持手动复制粘贴。现有解决方案主要有:
- 浏览器控制台爬取:不稳定且违反服务条款
- 第三方插件:存在数据安全风险
- 截图保存:无法进行后续处理
技术方案对比
我们评估了三种技术路线:
- 网页爬取:
- 优点:无需 API 权限
-
缺点:易被反爬,维护成本高
-
官方 API:
- 优点:稳定合规
-
缺点:需要开发成本
-
第三方工具:
- 优点:开箱即用
- 缺点:功能受限,隐私风险
综合比较,我们选择基于官方 API 开发自研工具,既保证合规性又能灵活定制。
核心实现
1. 异步 API 调用
使用 aiohttp 实现高效并发请求:
import aiohttp
from typing import AsyncIterator
class ChatGPTExporter:
def __init__(self, api_key: str):
self.api_key = api_key
self.session = aiohttp.ClientSession()
async def fetch_conversations(self) -> AsyncIterator[dict]:
headers = {"Authorization": f"Bearer {self.api_key}"}
async with self.session.get(
"https://api.openai.com/v1/conversations",
headers=headers
) as response:
response.raise_for_status()
data = await response.json()
for conv in data["items"]:
yield conv
2. 数据清洗管道
关键清洗步骤:
- 去除系统提示消息
- 合并连续用户消息
- 识别会话边界
- 提取元数据(时间戳、模型版本等)
def clean_message(raw: dict) -> dict:
"""标准化单条消息格式"""
return {"id": raw["id"],
"role": raw["role"],
"content": raw["content"].strip(),
"timestamp": parse_time(raw["created_at"])
}
3. 多格式导出器
以 Markdown 表格为例的生成算法:
def to_markdown(conversations: list[dict]) -> str:
"""生成带格式的 Markdown 表格"""
table = ["| 时间 | 角色 | 内容 |\n|------|------|------|"]
for msg in conversations:
row = f"| {msg['timestamp']} | {msg['role']} | {escape(msg['content'])} |"
table.append(row)
return "\n".join(table)
生产级优化
速率限制规避
实现指数退避重试机制:
async def fetch_with_retry(url: str, max_retries: int = 3) -> dict:
retry_delay = 1
for attempt in range(max_retries):
try:
return await self.session.get(url)
except RateLimitError:
await asyncio.sleep(retry_delay * (2 ** attempt))
raise APIError("Max retries exceeded")
敏感信息过滤
使用正则表达式实现关键词过滤:
SENSITIVE_PATTERNS = [r"\b(api[_-]?key|token)\b",
r"\d{4}[-]?\d{4}[-]?\d{4}[-]?\d{4}" # 信用卡号
]
def sanitize(text: str) -> str:
for pattern in SENSITIVE_PATTERNS:
text = re.sub(pattern, "[REDACTED]", text, flags=re.IGNORECASE)
return text
本地缓存设计
采用 SQLite 实现增量导出:
class ConversationCache:
def __init__(self, db_path: str):
self.conn = sqlite3.connect(db_path)
self._create_tables()
def _create_tables(self):
"""创建消息存储表"""
self.conn.execute("""
CREATE TABLE IF NOT EXISTS messages (
id TEXT PRIMARY KEY,
conversation_id TEXT,
content TEXT,
timestamp INTEGER
)""")
常见问题解决方案
会话 ID 重复
实现基于哈希值的去重逻辑:
def get_conversation_hash(msgs: list[dict]) -> str:
"""根据消息内容和时间生成唯一哈希"""
combined = "".join(f"{m['content']}{m['timestamp']}"
for m in sorted(msgs, key=lambda x: x["timestamp"])
)
return hashlib.md5(combined.encode()).hexdigest()
内存优化
对于大规模导出,采用分块处理:
- 按会话 ID 分组
- 每处理 100 个会话保存一次中间结果
- 使用生成器避免全量加载
时区处理
统一转换为 UTC 时间:
from datetime import datetime, timezone
def parse_time(timestamp: str) -> str:
dt = datetime.fromisoformat(timestamp.rstrip("Z"))
return dt.astimezone(timezone.utc).strftime("%Y-%m-%d %H:%M:%S")
扩展思考
本方案可扩展支持其他 AI 服务,关键适配点包括:
- API 客户端实现
- 消息格式转换器
- 服务特定参数配置
建议通过抽象基类定义统一接口:
from abc import ABC, abstractmethod
class AIConversationExporter(ABC):
@abstractmethod
async def fetch_messages(self) -> AsyncIterator[dict]:
pass
@abstractmethod
def format_message(self, raw: dict) -> dict:
pass
完整项目代码已开源在 GitHub,包含详细的使用文档和单元测试,欢迎贡献改进。这种基于官方 API 的自研方案既规避了法律风险,又能根据实际需求灵活调整,是团队知识管理的理想选择。
正文完
