共计 3388 个字符,预计需要花费 9 分钟才能阅读完成。
背景痛点
在企业微信群管理中,24 小时客服响应和知识库问答是常见需求。传统方式依赖人工值守,成本高且效率低。自建机器人方案虽然灵活,但需要处理企业微信复杂的 API 鉴权、消息加解密等细节,开发门槛较高。

WorkTool 作为企业微信机器人框架,提供了开箱即用的基础功能,开发者可以专注于业务逻辑开发。与自建方案相比,WorkTool 有以下优势:
- 内置企业微信 API 封装,省去了鉴权、消息解析等重复工作
- 提供插件机制,方便扩展功能
- 社区活跃,遇到问题容易找到解决方案
技术架构
整个系统的数据流程如下:
sequenceDiagram
企业微信 ->>WorkTool: 推送群消息 (加密)
WorkTool->> 企业微信: 获取 access_token
WorkTool->>WorkTool: 解密消息
WorkTool->>ChatGPT: 转发用户问题
ChatGPT->>WorkTool: 返回回答
WorkTool->> 企业微信: 加密回复消息
企业微信 ->> 群聊: 显示机器人回复
其中 access_token 是企业微信 API 调用的关键凭证,有效期为 2 小时但调用频率受限。WorkTool 内部实现了自动刷新机制,开发者无需手动维护。
核心代码实现
1. 企业微信消息解析
from werkzeug.security import check_password_hash
import xml.etree.ElementTree as ET
class WeWorkMessage:
"""处理企业微信加密消息"""
def __init__(self, token, encoding_aes_key):
self.token = token
self.aes_key = base64.b64decode(encoding_aes_key + "=")
def decrypt_msg(self, encrypted_msg):
"""解密 XML 格式消息"""
try:
# 解密逻辑...
return ET.fromstring(decrypted_xml)
except Exception as e:
logger.error(f"解密失败: {str(e)}")
raise
2. ChatGPT 异步调用
import aiohttp
from tenacity import retry, stop_after_attempt
class ChatGPTClient:
"""封装 ChatGPT API 调用"""
def __init__(self, api_key):
self.api_key = api_key
@retry(stop=stop_after_attempt(3))
async def ask(self, question, timeout=10):
"""异步提问接口"""
headers = {"Authorization": f"Bearer {self.api_key}"}
data = {"prompt": question, "max_tokens": 200}
async with aiohttp.ClientSession() as session:
async with session.post(
"https://api.openai.com/v1/completions",
json=data, headers=headers, timeout=timeout
) as resp:
if resp.status != 200:
raise Exception(f"API 错误: {await resp.text()}")
return await resp.json()
3. 敏感词过滤
import ahocorasick
class SensitiveFilter:
"""AC 自动机实现敏感词过滤"""
def __init__(self, keywords):
self.automaton = ahocorasick.Automaton()
for word in keywords:
self.automaton.add_word(word.lower(), word)
self.automaton.make_automaton()
def filter(self, text):
"""返回过滤后的文本"""
lower_text = text.lower()
for end, word in self.automaton.iter(lower_text):
text = text.replace(word, "***")
return text
性能优化方案
1. Redis 缓存上下文
使用 Redis 存储最近 5 轮对话,解决 ChatGPT 无状态问题:
import redis
from datetime import timedelta
r = redis.Redis(host='localhost', port=6379)
def get_context(user_id):
"""获取用户对话上下文"""
key = f"chat:{user_id}:context"
return r.lrange(key, 0, -1)
def save_context(user_id, question, answer):
"""保存最新对话"""
key = f"chat:{user_id}:context"
pipe = r.pipeline()
pipe.rpush(key, question, answer)
pipe.ltrim(key, -10, -1) # 保留最近 5 轮
pipe.expire(key, timedelta(hours=1))
pipe.execute()
2. Bloom Filter 去重
使用 PyBloom 过滤重复问题,降低 API 调用:
from pybloom_live import ScalableBloomFilter
bf = ScalableBloomFilter(initial_capacity=1000, error_rate=0.001)
def is_duplicate(question):
"""检查是否重复问题"""
key = question.lower().strip()
if key in bf:
return True
bf.add(key)
return False
避坑指南
- 企业微信 IP 白名单
- 登录企业微信管理后台
- 进入「应用管理」->「自建应用」
-
在「可信 IP」中添加服务器公网 IP
-
ChatGPT 参数调优
- temperature 建议 0.7-0.9 平衡创造性和稳定性
-
max_tokens 根据回答长度调整,一般 200-300
-
消息重试策略
- 采用指数退避重试:1s, 2s, 4s…
- 记录失败消息到死信队列
延伸思考
可以结合 LangChain 实现更复杂的对话管理:
from langchain import LLMChain, PromptTemplate
from langchain.memory import RedisChatMessageHistory
prompt = PromptTemplate(input_variables=["history", "input"],
template=""" 你是一个客服助手,根据对话历史回答问题:{history}
用户问:{input}
助手答:"""
)
history = RedisChatMessageHistory(
session_id="user123",
url="redis://localhost:6379/0"
)
chain = LLMChain(llm=ChatOpenAI(),
prompt=prompt,
memory=history
)
response = chain.run("你们支持哪些支付方式?")
建议为关键函数编写单元测试,例如:
import unittest
class TestMessageFilter(unittest.TestCase):
def setUp(self):
self.filter = SensitiveFilter(["暴力", "赌博"])
def test_clean_text(self):
result = self.filter.filter("拒绝赌博行为")
self.assertEqual(result, "拒绝 *** 行为")
通过本文介绍的方法,我们成功搭建了响应速度 <1 秒的企业微信智能助手。实际部署时建议添加监控告警,关注 API 调用量和响应时间指标。未来可以考虑接入更多 AI 能力,如语音识别、情感分析等,进一步提升用户体验。
正文完
