共计 2328 个字符,预计需要花费 6 分钟才能阅读完成。
背景痛点:为什么需要自定义 Skill
LangGraph 作为工作流编排工具,原生支持基础的线性流程和条件分支。但在真实业务场景中,我们经常遇到这样的问题:

- 需要调用第三方 API(如支付网关、OCR 服务)
- 执行领域特定逻辑(如医疗报告解析、金融风控规则)
- 复用已有代码模块(如企业内部算法服务)
原生工作流直接嵌入这些逻辑会导致:
- 代码臃肿:业务逻辑与流程控制代码混杂
- 难以维护:每次修改都需要重新部署整个图
- 复用困难:相同功能在不同工作流中重复实现
技术方案:Graph Node 扩展模式
方案选型对比
- 直接修改源码:
- 优点:快速验证想法
-
缺点:破坏框架完整性,升级困难
-
Graph Node 扩展:
- 优点:符合开闭原则,独立演进
- 缺点:需要设计接口规范
核心实现机制
1. Skill 接口定义
from abc import ABC, abstractmethod
from typing import Any, Dict
class BaseSkill(ABC):
"""Skill 标准接口"""
@abstractmethod
async def execute(self, state: Dict[str, Any]) -> Dict[str, Any]:
"""
执行入口
:param state: 工作流上下文状态
:return: 更新后的状态片段
"""
pass
2. 上下文传递实现
通过 State 对象的 _skill_namespace 隔离不同 Skill 的变量:
def skill_node(state: dict, skill: BaseSkill):
# 自动生成命名空间(可用类名 +hash)namespace = f"skill_{skill.__class__.__name__}"
# 执行并获取增量状态
result = await skill.execute(state.get(namespace, {})
)
# 合并状态(深拷贝避免污染)return {
**state,
namespace: {**state.get(namespace, {}),
**result
}
}
3. 完整集成示例
# 定义气象查询 Skill
class WeatherQuerySkill(BaseSkill):
async def execute(self, state):
location = state["location"]
# 模拟 API 调用
return {
"temperature": 25.6,
"humidity": 0.78
}
# 构建工作流
builder = GraphBuilder()
builder.add_node(
"query_weather",
partial(skill_node, skill=WeatherQuerySkill())
)
builder.set_entry_point("query_weather")
graph = builder.compile()
# 执行工作流
await graph.run({"location": "Beijing"})
生产级优化策略
性能调优
-
异步批处理:
async def batch_execute(self, states: List[dict]): # 合并同类 API 请求 locations = [s["location"] for s in states] responses = await weather_api.batch_query(locations) return [{"data": r} for r in responses] -
缓存策略:
from datetime import timedelta from cachetools import TTLCache class CachedSkill(BaseSkill): def __init__(self): self.cache = TTLCache(maxsize=1000, ttl=timedelta(minutes=30))
安全防护
-
输入验证装饰器
def validate_input(schema: dict): def decorator(method): def wrapper(self, state): validate(state, schema) # 使用 jsonschema 等库 return method(self, state) return wrapper return decorator -
敏感数据过滤
class PaymentSkill(BaseSkill): async def execute(self, state): return {"masked_card": mask_number(state["card_no"]), "transaction_id": generate_uuid()}
稳定性保障
-
熔断机制:
from circuitbreaker import circuit @circuit(failure_threshold=5, recovery_timeout=60) async def execute(self, state): # 业务逻辑 -
幂等性设计:
class IdempotentSkill(BaseSkill): def __init__(self): self.processed_ids = set() async def execute(self, state): if state["request_id"] in self.processed_ids: return {"status": "duplicate"} # 正常处理...
实践思考
这种架构在实际项目中带来了明显收益:
- 团队协作效率提升:前端可独立开发 Skill
- 故障隔离:单个 Skill 异常不影响整体流程
- 监控细化:每个 Skill 可单独采集 Metrics
遗留的开放性问题是:如何在不重启服务的情况下,实现 Skill 的版本热更新?欢迎在评论区分享你的解决方案。
正文完
