飞书接入ChatGPT实战指南:从零搭建智能对话机器人

3次阅读
没有评论

共计 3590 个字符,预计需要花费 9 分钟才能阅读完成。

image.webp

背景痛点

企业 IM 工具(如飞书)接入 AI 服务的需求日益增长,特别是在客服自动化和知识库问答场景。传统人工客服效率低、响应慢,而 AI 机器人可以 7 ×24 小时在线,快速回答常见问题。知识库问答场景中,AI 可以快速检索并返回结构化信息,大幅提升员工效率。

然而,企业级应用对接面临诸多挑战:

  • 需要处理高并发请求
  • 必须保证数据安全性和隐私性
  • 需要维护对话上下文
  • 应对 API 调用限制和配额管理

技术选型

飞书开放平台提供了两种主要的集成方式:Webhook 和机器人 API。我们对比两者的特点:

  • Webhook
  • 需要公网可访问的回调地址
  • 适合事件驱动型应用
  • 需要处理加密验证

  • 机器人 API

  • 主动调用飞书 API
  • 适合定时任务或主动推送
  • 需要管理 access_token

对于 ChatGPT 集成,我们选择 Webhook 方式,原因如下:

  1. 实时性更好,消息到达立即触发
  2. 飞书服务器负责重试机制,更可靠
  3. 与 ChatGPT 的异步响应特性更匹配

选择 ChatGPT API 而非其他 AI 服务,主要考虑其强大的自然语言理解能力和丰富的知识库。

核心实现

飞书事件订阅配置

  1. 登录飞书开放平台,创建自建应用
  2. 在 ” 权限管理 ” 中申请以下权限:
  3. 获取单聊、群聊消息
  4. 发送消息
  5. 接收消息 v2.0
  6. 在 ” 事件订阅 ” 中添加消息接收事件
  7. 配置 Encrypt Key 和 Verification Token

飞书接入 ChatGPT 实战指南:从零搭建智能对话机器人

Python 代码实现

import hashlib
import base64
from typing import Dict, Any

# 验证飞书请求签名
def verify_signature(
    timestamp: str, 
    nonce: str, 
    signature: str, 
    encrypt_key: str, 
    body: str
) -> bool:
    """
    验证飞书 Webhook 请求签名
    :param timestamp: 请求头中的时间戳
    :param nonce: 请求头中的随机数
    :param signature: 请求头中的签名
    :param encrypt_key: 飞书应用的 Encrypt Key
    :param body: 请求体原始内容
    :return: 验证结果
    """content = f"{timestamp}{nonce}{encrypt_key}{body}".encode('utf-8')
    sha1 = hashlib.sha1(content).hexdigest()
    return sha1 == signature

# 解析加密消息
def decrypt_message(
    encrypt: str, 
    encrypt_key: str
) -> str:
    """
    解密飞书加密消息
    :param encrypt: 加密消息体
    :param encrypt_key: 飞书应用的 Encrypt Key
    :return: 解密后的消息
    """
    # 飞书使用 AES-256-CBC 加密
    # 实现细节省略...
    pass

流式响应处理

使用 aiohttp 实现 ChatGPT 流式响应:

import aiohttp
from tenacity import retry, stop_after_attempt, wait_exponential

@retry(stop=stop_after_attempt(3),
    wait=wait_exponential(multiplier=1, min=4, max=10)
)
async def call_chatgpt_stream(messages: list[dict], 
    api_key: str
) -> AsyncGenerator[str, None]:
    """
    调用 ChatGPT 流式 API
    :param messages: 对话消息列表
    :param api_key: OpenAI API 密钥
    :yield: 流式返回的文本片段
    """url ="https://api.openai.com/v1/chat/completions"headers = {"Authorization": f"Bearer {api_key}","Content-Type":"application/json"
    }
    data = {
        "model": "gpt-3.5-turbo",
        "messages": messages,
        "stream": True
    }

    async with aiohttp.ClientSession() as session:
        async with session.post(url, headers=headers, json=data) as resp:
            if resp.status != 200:
                error = await resp.json()
                raise Exception(f"API error: {error}")

            async for line in resp.content:
                if line.startswith(b"data:"):
                    chunk = line[6:].strip()
                    if chunk == b"[DONE]":
                        break
                    try:
                        data = json.loads(chunk)
                        yield data["choices"][0]["delta"].get("content", "")
                    except json.JSONDecodeError:
                        continue

生产考量

对话上下文管理

使用 Redis 存储对话 session 的推荐方案:

import redis
from datetime import timedelta

redis_client = redis.Redis(host='localhost', port=6379, db=0)

class SessionManager:
    @staticmethod
    def get_session_key(chat_id: str) -> str:
        return f"chat_session:{chat_id}"

    @staticmethod
    def save_messages(chat_id: str, messages: list[dict], ttl: int = 3600):
        key = SessionManager.get_session_key(chat_id)
        redis_client.setex(key, timedelta(seconds=ttl), json.dumps(messages))

    @staticmethod
    def load_messages(chat_id: str) -> list[dict]:
        key = SessionManager.get_session_key(chat_id)
        data = redis_client.get(key)
        return json.loads(data) if data else []

敏感词过滤

使用正则表达式实现基础过滤:

import re

class ContentFilter:
    def __init__(self):
        self.patterns = [r'(?i)password|secret|token|api[_-]?key',
            r'\d{4}[-]?\d{4}[-]?\d{4}[-]?\d{4}'  # 信用卡号
        ]

    def filter(self, text: str) -> str:
        for pattern in self.patterns:
            text = re.sub(pattern, '[FILTERED]', text)
        return text

性能压测

使用 Locust 模拟 100+ 并发请求的测试脚本:

from locust import HttpUser, task, between

class ChatBotUser(HttpUser):
    wait_time = between(0.5, 2.5)

    @task
    def send_message(self):
        headers = {"Content-Type": "application/json"}
        data = {
            "message": "How to reset my password?",
            "chat_id": "test_123"
        }
        self.client.post("/webhook", json=data, headers=headers)

避坑指南

  1. 飞书消息体字段大小写问题
  2. 飞书 API 返回的 JSON 字段有时使用驼峰式 (camelCase),有时使用下划线式 (snake_case)
  3. 建议统一处理字段名,或使用 pydantic 模型进行转换

  4. ChatGPT token 超限的 fallback 策略

  5. 监控 token 使用情况,当接近限制时切换较小模型
  6. 实现缓存机制,对相似问题返回缓存答案
  7. 返回友好错误信息,如 ” 当前请求过多,请稍后再试 ”

  8. 多租户场景下的隔离方案

  9. 为每个租户分配独立的 Redis 数据库或前缀
  10. 使用不同的 OpenAI API key 配额
  11. 记录详细的请求日志用于审计

总结

本文详细介绍了飞书接入 ChatGPT 的全流程实现方案。从事件订阅配置到核心代码实现,再到生产环境的各种考量,希望能帮助开发者快速搭建企业级智能对话机器人。实际部署时,还需要考虑监控告警、日志收集等运维工作,以及持续优化对话体验。

正文完
 0
评论(没有评论)