共计 1691 个字符,预计需要花费 5 分钟才能阅读完成。
背景痛点
在企业微信的日常运营中,高频重复性问题、7×24 小时即时响应、知识沉淀与复用是三大核心痛点。例如:

- 新员工反复询问考勤制度、报销流程等基础问题
- 非工作时间无法及时响应客户咨询
- 历史问答信息分散在聊天记录中难以检索
技术选型
WorkTool vs 其他框架对比
| 特性 | WorkTool | WeChatBot | ItChat |
|---|---|---|---|
| API 稳定性 | 企业微信官方合作 | 逆向协议风险 | 封号风险高 |
| 消息协议支持 | 全量企业微信 API | 仅基础消息 | 个人微信限定 |
| 扩展性 | 支持插件化开发 | 功能单一 | 无官方 SDK |
核心实现
1. 企业微信鉴权机制
import jwt
from datetime import datetime, timedelta
class WeComAuth:
def __init__(self, corp_id: str, secret: str):
self.corp_id = corp_id
self.secret = secret
def get_token(self) -> str:
"""获取 access_token 并自动处理刷新逻辑"""
payload = {
'iss': self.corp_id,
'exp': datetime.utcnow() + timedelta(seconds=7200)
}
return jwt.encode(payload, self.secret, algorithm='HS256')
2. ChatGPT 消息适配器
async def stream_response_to_wechat(
prompt: str,
session_id: str
) -> Generator[str, None, None]:
"""处理 OpenAI 流式响应并适配企业微信消息格式"""
buffer = ""
async for chunk in openai.ChatCompletion.create(
model="gpt-4",
messages=[{"role": "user", "content": prompt}],
stream=True
):
buffer += chunk.choices[0].delta.get("content", "")
# 企业微信单消息长度限制检测
if len(buffer) >= 1900:
yield buffer[:1900]
buffer = buffer[1900:]
if buffer:
yield buffer
性能优化
异步处理架构
flowchart LR
A[企业微信回调] --> B[Redis 消息队列]
B --> C{Celery Worker}
C --> D[调用 GPT API]
D --> E[结果写回企业微信]
对话缓存策略
from redis import Redis
from hashlib import md5
class DialogueCache:
def __init__(self, redis: Redis):
self.client = redis
def get_cache(self, query: str) -> Optional[str]:
key = md5(query.encode()).hexdigest()
return self.client.get(f"gpt_cache:{key}")
def set_cache(self, query: str, response: str, ttl=3600):
key = md5(query.encode()).hexdigest()
self.client.setex(f"gpt_cache:{key}", ttl, response)
避坑指南
频率限制规避
- 错峰发送:对非紧急消息添加随机延迟 (0.5- 3 秒)
- 批量消息合并:相同内容问题合并回复
- 使用企业微信「模版消息」接口提升限额
敏感词过滤
def content_filter(text: str) -> bool:
with open("sensitive_words.txt") as f:
keywords = [line.strip() for line in f]
return any(keyword in text for keyword in keywords)
延伸思考
- 如何结合 OCR 技术实现图片中的文字问答?
- 语音消息转文本处理的延迟优化方案
- 多机器人协同时的负载均衡策略
正文完
