共计 3811 个字符,预计需要花费 10 分钟才能阅读完成。
在构建复杂智能体系统时,数据预处理往往是决定整个系统质量和效率的关键环节。今天我想和大家分享一种通过 skill 嵌套来解决复杂数据清洗场景的实践经验。

背景与痛点
实际项目中,我们经常会遇到这样的数据处理流程:
- 原始数据(可能来自多个异构数据源)
- 基础清洗(去重、缺失值处理、格式标准化等)
- 特征提取(从原始数据中提取有意义的特征)
- 业务规则校验(确保数据符合特定业务逻辑)
传统做法中,我们可能会把这些步骤写成一系列顺序执行的函数。但这种方法存在几个明显问题:
- 错误处理困难:某个步骤出错时难以区分是原始数据问题还是处理逻辑问题
- 复用性差:相似的清洗步骤在不同流程中需要重复实现
- 调试困难:当处理流程很长时,难以追踪数据在每个步骤中的变化
技术对比:普通函数 vs Skill 嵌套
让我们先看看两种实现方式的对比:
普通函数调用方式
def process_data(data):
data = clean_raw_data(data) # 基础清洗
data = extract_features(data) # 特征提取
data = validate_business_rules(data) # 业务校验
return data
这种方式的问题在于:
- 所有步骤耦合在一起
- 中间结果难以单独测试
- 错误发生时难以定位问题步骤
Skill 嵌套方式
@data_skill
def clean_raw_data(data: RawData) -> CleanedData:
...
@data_skill
def extract_features(data: CleanedData) -> FeatureData:
...
@data_skill
def validate_business_rules(data: FeatureData) -> ValidatedData:
...
# 组合使用
pipeline = clean_raw_data >> extract_features >> validate_business_rules
result = pipeline(raw_data)
Skill 嵌套的优势:
- 每个步骤独立可测试
- 类型安全(通过类型标注确保步骤间的数据兼容性)
- 错误隔离(每个步骤的错误不会影响其他步骤)
- 灵活组合(可以像搭积木一样组合不同 skill)
核心实现
使用装饰器实现 Skill 嵌套
首先我们定义一个基础装饰器来创建 skill:
from typing import TypeVar, Callable, Any, Optional
from functools import wraps
T = TypeVar('T')
SkillFunc = Callable[[T], Any]
class DataSkillError(Exception):
"""自定义异常类型"""
pass
def data_skill(func: SkillFunc) -> SkillFunc:
"""
将普通函数转换为数据 skill 的装饰器
:param func: 要装饰的函数
:return: 装饰后的函数,具有 skill 能力
"""
@wraps(func)
def wrapper(data: T, *args, **kwargs) -> Any:
try:
# 在这里可以添加预处理逻辑
result = func(data, *args, **kwargs)
# 在这里可以添加后处理逻辑
return result
except Exception as e:
raise DataSkillError(f"Skill {func.__name__} failed: {str(e)}") from e
# 添加组合操作符
wrapper.__or__ = lambda self, other: SkillPipeline(self) >> other
return wrapper
Skill 管道实现
为了让 skill 可以像管道一样组合,我们需要实现一个管道类:
class SkillPipeline:
"""处理 skill 组合执行的管道类"""
def __init__(self, *skills: SkillFunc):
self.skills = skills
def __call__(self, data: Any) -> Any:
"""执行管道中的所有 skill"""
result = data
for skill in self.skills:
result = skill(result)
return result
def __rshift__(self, other: SkillFunc) -> 'SkillPipeline':
"""重载 >> 操作符,用于组合 skill"""
return SkillPipeline(*self.skills, other)
状态管理
对于需要共享状态的场景,我们可以使用闭包:
@data_skill
def create_counter_skill() -> SkillFunc:
"""创建一个带有计数器的 skill"""
count = 0
@data_skill
def counter_skill(data: Any) -> Any:
nonlocal count
count += 1
print(f"Skill has been called {count} times")
return data
return counter_skill
性能优化
缓存机制
对于计算量大的 skill,可以添加缓存:
from functools import lru_cache
@data_skill
@lru_cache(maxsize=128)
def expensive_computation(data: Tuple) -> Any:
"""带有缓存的耗时计算 skill"""
# 模拟耗时计算
time.sleep(0.1)
return data
流式处理
对于大数据集,可以使用生成器实现流式处理:
@data_skill
def stream_processor(data_iter: Iterable) -> Iterable:
"""流式处理 skill"""
for item in data_iter:
# 对每个元素进行处理
processed = process_item(item)
yield processed
内存占用对比(处理 100 万条记录):
- 传统方式:约 800MB
- 流式处理:约 50MB
避坑指南
循环引用检测
当 skill 互相依赖时,可能会产生循环引用。我们可以添加检测逻辑:
def check_circular_dependency(skills: List[SkillFunc]) -> bool:
"""检查 skill 列表中是否存在循环依赖"""
dependency_graph = {}
for skill in skills:
dependencies = getattr(skill, '_dependencies', set())
dependency_graph[skill.__name__] = dependencies
visited = set()
path = set()
def visit(vertex):
if vertex in path:
return True
path.add(vertex)
for neighbor in dependency_graph.get(vertex, set()):
if visit(neighbor):
return True
path.remove(vertex)
visited.add(vertex)
return False
for vertex in dependency_graph:
if vertex not in visited:
if visit(vertex):
return True
return False
日志追踪
为了便于调试,我们需要在 skill 间传递上下文 ID:
import uuid
def traced_skill(func: SkillFunc) -> SkillFunc:
"""添加追踪能力的装饰器"""
@wraps(func)
def wrapper(data: Any, context_id: Optional[str] = None, *args, **kwargs) -> Any:
if context_id is None:
context_id = str(uuid.uuid4())
print(f"[{context_id}] Entering {func.__name__}")
try:
result = func(data, *args, **kwargs)
print(f"[{context_id}] Exiting {func.__name__}")
return result
except Exception as e:
print(f"[{context_id}] Error in {func.__name__}: {str(e)}")
raise
return wrapper
延伸思考
虽然 skill 嵌套提供了很多便利,但并不是所有场景都适用。以下情况可能不适合使用嵌套:
- 非常简单的数据处理流程(可能增加不必要的复杂度)
- 需要极高性能的场景(装饰器和管道会带来额外开销)
- 步骤间有复杂依赖关系的场景
在实际项目中,我建议先从小规模开始尝试,评估效果后再决定是否大规模采用。
总结
通过 skill 嵌套的方式组织数据清洗流程,我们获得了以下优势:
- 更好的模块化和复用性
- 更清晰的错误隔离和调试
- 更灵活的处理流程组合
- 类型安全的处理管道
当然,这种方法也需要一些额外的基础设施代码,但长远来看,对于复杂的智能体系统,这种投入是值得的。希望这篇文章能给你带来启发,也欢迎分享你在数据预处理方面的经验和想法。
正文完
