共计 3277 个字符,预计需要花费 9 分钟才能阅读完成。
核心概念:Cadence Skill 架构解析
Cadence Skill 是基于 Cadence 工作流引擎的扩展模块,采用插件化架构实现业务逻辑与工作流解耦。其核心交互流程分为三个层次:

- Worker 层:长期运行的守护进程,负责监听特定 TaskList 并执行注册的 Activity
- Activity 层 :包含具体业务逻辑的代码单元,通过
@activity_method装饰器暴露为可调用服务 - 工作流层:通过 DecisionTask 调度 Activity 执行顺序,维护工作流状态机
典型数据流如下图所示(伪代码表示):
# 工作流定义
@workflow.defn
class OrderProcessingWorkflow:
@workflow.run
async def run(self, order_id):
await validate_order(order_id) # 调用 Skill Activity
await process_payment(order_id)
# Skill Activity 实现
@activity.defn(name="validate_order")
async def validate_order(order_id: str) -> bool:
return await db.query("SELECT valid FROM orders WHERE id=?", order_id)
开发者常见痛点分析
1. 依赖地狱问题
- Python 环境与 Cadence 服务端版本不兼容(常见于 v0.10 到 v1.0 迁移期)
- 第三方库(如 boto3、psycopg2)版本冲突导致 Activity 执行异常
2. 环境隔离缺失
- 本地开发机安装的依赖污染生产环境
- 多 Skill 共用一个 Worker 导致资源竞争
3. 生产部署瓶颈
- TaskList 配置不合理引发任务堆积
- 未设置适当的 VisibilityArchival 导致历史数据膨胀
Docker 化标准安装方案
基础环境准备
-
创建隔离的 Python 环境:
FROM python:3.9-slim WORKDIR /skill # 锁定核心依赖版本 COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt # 示例 requirements.txt 内容 cadence-client==1.2.0 protobuf==3.20.1 psutil==5.9.0 -
配置 Worker 启动脚本(
entrypoint.sh):#!/bin/bash python -m cadence.worker \ --domain ${CADENCE_DOMAIN} \ --task_list ${TASK_LIST} \ --host ${CADENCE_HOST} \ --port ${CADENCY_PORT} \ --identity $(hostname)
Activity 注册最佳实践
from cadence.activity_method import activity_method
from cadence.workerfactory import WorkerFactory
# 定义 Activity 接口
class PaymentActivities:
@activity_method(task_list="payment", schedule_to_close_timeout=30)
async def authorize(self, amount: float) -> str:
raise NotImplementedError
# 具体实现
class PaymentActivitiesImpl:
async def authorize(self, amount):
# 实际支付逻辑
return f"txn_{uuid.uuid4()}"
# Worker 初始化
factory = WorkerFactory("cadence.example.com", 7933, "example-domain")
worker = factory.new_worker("payment-tasklist")
worker.register_activities_implementation(PaymentActivitiesImpl(), "PaymentActivities")
factory.start()
生产环境关键配置
性能优化参数
| 参数名 | 建议值 | 说明 |
|---|---|---|
| max_concurrent_activity | CPU 核心数×2 | 避免过多的上下文切换开销 |
| task_rps | 50 | 单个 Worker 的请求速率上限 |
| sticky_cache_size | 1000 | 粘性工作流缓存条目数 |
IAM 最小权限示例(AWS 环境)
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"cadence:StartWorkflowExecution",
"cadence:RespondDecisionTaskCompleted"
],
"Resource": "arn:aws:cadence:us-east-1:1234567890:domain/example-domain"
}
]
}
避坑指南
依赖锁定策略
-
使用
pip-compile生成精确依赖树:pip install pip-tools pip-compile --generate-hashes requirements.in > requirements.txt -
多环境差异处理:
# Makefile 示例 dev-deps: requirements-dev.txt pip install -r $< prod-deps: requirements.txt pip install -r $< --no-deps
环境配置分离
采用 12-Factor 应用原则管理配置:
import os
from cadence.workerfactory import WorkerFactory
factory = WorkerFactory(host=os.getenv("CADENCE_HOST"),
port=int(os.getenv("CADENCE_PORT", "7933")),
domain=os.getenv("CADENCE_DOMAIN")
)
验证与测试
基础功能测试
# 使用 cadence-cli 验证 Worker 注册
cadence --do example-domain adm cl list tasklists
# 触发测试工作流
cadence --do example-domain wf start --tl demo-tasklist --wt 60 --w demo_workflow
负载测试方案
-
使用 locust 模拟请求:
from locust import task, between from cadence.client import WorkflowClient class CadenceUser(HttpUser): wait_time = between(0.5, 2) @task def trigger_workflow(self): client = WorkflowClient.new_client("cadence-server:7933", "example-domain") workflow = client.new_workflow_stub(MyWorkflow) workflow.start(args=...) -
监控关键指标:
cadence_worker_poller_count{tasklist="payment"} cadence_activity_execution_latency_bucket
总结建议
对于生产级 Skill 部署,建议遵循以下原则:
- 使用容器化封装保证环境一致性
- 通过
max_concurrent_activity控制资源使用 - 为每个 Skill 分配独立的 TaskList
- 定期清理已完成的工作流历史记录
通过本文介绍的方法,开发者可以构建出具备生产可用性的 Cadence Skill 服务。实际部署时还需结合具体业务场景调整参数,建议通过渐进式发布策略验证稳定性。
正文完
