共计 2936 个字符,预计需要花费 8 分钟才能阅读完成。
背景痛点:为什么需要数据清洗 skill 嵌套?
在智能体开发中,我们经常遇到多源异构数据的处理需求。比如一个电商智能体可能需要同时处理来自 API 的 JSON 数据、爬取的 HTML 文本以及用户上传的 Excel 表格。传统硬编码方案通常采用线性处理流程:

def clean_data(raw):
data = parse_json(raw) # 步骤 1
data = remove_duplicates(data) # 步骤 2
data = normalize_format(data) # 步骤 3
return data
这种方案存在明显缺陷:
- 扩展性差:新增清洗步骤需要修改主流程代码
- 复用困难:相同清洗逻辑在不同场景需要重复实现
- 缺乏弹性:无法根据数据特征动态调整清洗流程
技术对比:三种设计模式的选择
1. 装饰器模式(Decorator Pattern)
通过在运行时动态添加功能,适合简单的预处理 / 后处理场景:
@validate_input
@log_execution_time
def clean_data(raw):
...
局限:难以处理复杂的分支逻辑
2. 责任链模式(Chain of Responsibility)
将处理对象组成链条,每个节点决定是否处理请求:
graph LR
A[原始数据] --> B[HTML 清洗器]
B --> C{是否 JSON?}
C -->| 是 | D[JSON 解析器]
C -->| 否 | E[文本标准化]
优势:天然支持嵌套,但实现复杂度较高
3. 管道 - 过滤器模式(Pipe-Filter Pattern)
每个处理步骤作为独立过滤器,通过管道连接:
def process(data):
for filter in [decode, validate, normalize]:
data = filter(data)
return data
嵌套优势:
- 天然支持并发执行(如使用 asyncio)
- 便于动态调整管道顺序
- 每个过滤器可独立测试
核心实现:类型安全的嵌套架构
接口定义(Protocol)
使用 Python 3.10+ 的类型系统确保所有 skill 实现统一接口:
from typing import Protocol, TypeVar
T = TypeVar('T')
class DataSkill(Protocol[T]):
def __call__(self, data: T) -> T:
...
嵌套执行实现
通过 __call__ 魔术方法实现链式调用,并添加熔断器(circuit breaker):
from circuitbreaker import circuit
class CleaningPipeline:
def __init__(self, *skills: DataSkill[T]]):
self.skills = skills
@circuit(failure_threshold=3) # 失败 3 次后熔断
def __call__(self, data: T, timeout: float = 5.0) -> T:
for skill in self.skills:
data = skill(data)
return data
超时控制示例
import signal
from contextlib import contextmanager
@contextmanager
def timeout(seconds):
def handler(signum, frame):
raise TimeoutError("Skill execution timed out")
signal.signal(signal.SIGALRM, handler)
signal.alarm(seconds)
try:
yield
finally:
signal.alarm(0)
# 使用示例
try:
with timeout(3):
result = pipeline(data)
except TimeoutError:
logger.warning("Pipeline timeout")
生产环境考量
内存管理优化
嵌套层级与内存消耗的关系(测试环境:AWS c5.xlarge/8GB 内存):
| 嵌套层级 | 内存占用(MB) | 执行时间(ms) |
|---|---|---|
| 5 | 45 | 120 |
| 10 | 78 | 310 |
| 20 | 142 | 980 |
优化建议:
- 对于深度嵌套场景,使用生成器(yield)替代完整数据传递
- 定期调用
gc.collect()手动触发垃圾回收 - 限制最大嵌套深度(如通过装饰器检查)
线程安全实践
使用 asyncio.Lock 保护共享状态:
class SharedStateSkill:
def __init__(self):
self.lock = asyncio.Lock()
self.cache = {}
async def __call__(self, data):
async with self.lock:
if data.key not in self.cache:
self.cache[data.key] = process(data)
return self.cache[data.key]
常见问题与解决方案
1. 无限递归
现象:Skill A 调用 Skill B,Skill B 又回调 Skill A
解决:
def check_recursion(max_depth=10):
def decorator(skill):
depth = 0
def wrapper(*args, **kwargs):
nonlocal depth
if depth >= max_depth:
raise RecursionError("Maximum nesting depth exceeded")
depth += 1
try:
return skill(*args, **kwargs)
finally:
depth -= 1
return wrapper
return decorator
2. 上下文污染
现象:前一个 skill 修改了全局状态影响后续执行
解决:
- 使用
contextvars管理上下文 - 每个 skill 返回数据副本
3. 性能雪崩
现象:某个 skill 阻塞导致整个管道延迟
解决:
- 为每个 skill 设置独立超时
- 实现背压(backpressure)机制
延伸思考:动态编排的可能性
可以结合 DAG(有向无环图)调度器实现更灵活的 skill 编排:
from airflow.models import DAG
from airflow.operators.python import PythonOperator
with DAG('data_cleaning') as dag:
t1 = PythonOperator(task_id='extract', python_callable=extract)
t2 = PythonOperator(task_id='clean', python_callable=clean)
t3 = PythonOperator(task_id='load', python_callable=load)
t1 >> t2 >> t3 # 定义执行顺序
未来方向:
- 基于数据特征自动选择 skill 组合
- 运行时动态优化管道顺序
- 可视化编排工具集成
实践心得
在真实项目中应用这套架构后,我们的数据清洗流程的代码复用率提升了 60%,同时由于支持并发执行,整体处理时间缩短了 35%。特别是在处理突发流量时,熔断机制有效防止了级联故障。
建议初次实现时从简单场景入手,逐步添加异常处理等生产级特性。记得为每个 skill 编写单元测试,这对维护复杂嵌套系统至关重要。
