共计 2253 个字符,预计需要花费 6 分钟才能阅读完成。
业务场景驱动设计
最近在帮某电商平台升级客服系统时遇到典型需求:用户咨询 ” 订单未收到 ” 时,Agent 需要自动完成:
1. 物流状态查询(调用快递 API)
2. 异常识别(机器学习模型)
3. 工单分类(规则引擎)
4. 回复话术生成(NLG 模块)

另一个金融场景更复杂:风险咨询需串联反欺诈检测、客户画像分析、合规审查三个技能链。这些 case 暴露了传统方案的不足:
- 规则引擎 适合明确逻辑(如工单分类),但维护成本随复杂度指数上升
- 纯 ML 方案 在模糊匹配(如情绪识别)表现好,但缺乏可解释性
核心架构设计
技能编排引擎
采用 DAG(有向无环图)模型实现技能编排,如下图所示:
graph LR
A[物流查询] --> B[异常检测]
B --> C{是否异常?}
C -->| 是 | D[工单生成]
C -->| 否 | E[标准回复]
关键设计点:
1. 每个技能是独立 Python class,需实现 async execute(ctx) 方法
2. 通过 @skill_registry.register 装饰器注册技能
3. DAG 定义使用 YAML 描述依赖关系
异常处理机制
完整代码示例:
class SkillRunner:
async def run_dag(self, dag_def: dict):
executed = [] # 记录执行顺序用于回滚
ctx = SkillContext()
try:
for skill_name in topological_sort(dag_def):
skill = skill_registry.get(skill_name)
executed.append(skill_name)
# 带超时控制的执行
try:
await asyncio.wait_for(skill.execute(ctx),
timeout=SKILL_TIMEOUT
)
except asyncio.TimeoutError:
await self._rollback(executed, ctx)
raise SkillTimeoutError(skill_name)
except Exception as e:
await self._rollback(executed, ctx)
raise
async def _rollback(self, skills: list, ctx: SkillContext):
for skill_name in reversed(skills):
skill = skill_registry.get(skill_name)
if hasattr(skill, 'rollback'):
await skill.rollback(ctx)
时间复杂度分析:
– DAG 拓扑排序:O(V+E)
– 技能执行:O(N)平均技能耗时
– 回滚操作:O(N)平均回滚耗时
性能优化实战
上下文共享优化
典型内存问题:每个技能都复制上下文会导致 O(N)内存增长。解决方案:
class SkillContext:
def __init__(self):
self._data = {}
self._dirty_flags = set()
def set_data(self, key: str, value: Any, owner: str):
if key in self._data and owner not in self._dirty_flags:
raise ContextConflictError(f"{key} already exists")
self._data[key] = value
self._dirty_flags.add(owner)
通过写时检查避免数据污染,实测内存占用降低 62%
熔断设计
基于滑动窗口统计失败率:
class CircuitBreaker:
def __init__(self, max_failures=3, window_sec=30):
self.failure_count = 0
self.last_failure_time = 0
async def __call__(self, skill_func):
if time.time() - self.last_failure_time > self.window_sec:
self.failure_count = 0
if self.failure_count >= self.max_failures:
raise CircuitOpenError
try:
return await skill_func()
except Exception:
self.failure_count += 1
self.last_failure_time = time.time()
raise
生产环境要点
灰度发布策略
- 新技能版本部署到 5% 的 Agent 节点
- 通过 A / B 测试对比关键指标:
- 技能成功率
- 平均响应时间
- 下游技能触发率
- 全量前必须检查:
- 上下文 schema 兼容性
- 回滚脚本有效性
安全审查清单
- 输入输出过滤:
def sanitize_input(text: str) -> str: # 移除敏感信息(身份证 / 银行卡号等)pattern = r'\d{18}|\d{16}' return re.sub(pattern, '[REDACTED]', text) - 权限控制:每个技能声明所需权限级别
- 审计日志:记录原始输入和关键决策点
开放性问题
- 跨平台技能复用如何解决上下文差异?(如电商客服技能复用给银行场景)
- 动态 DAG 调整有哪些可行方案?(运行时根据上下文修改技能流)
- 如何设计技能市场让第三方开发者安全贡献能力?
从实际落地来看,这套架构已在日均百万级请求的系统中稳定运行。最难的不是技术实现,而是平衡灵活性与可控性。建议从小规模关键链路开始试点,逐步积累技能资产。
正文完