节点接入指南
本指南介绍 AgentFlow SDK 的深入使用方法,包括配置管理、异常处理、容灾机制。
前置要求
| 软件 | 版本 |
|---|---|
| Python | 3.8+ |
| pip | 最新版 |
pip install agentflow-sdk
配置管理(AgentFlowConfig)
SDK 通过 AgentFlowConfig 统一管理所有配置。
代码配置
from agentflow import AgentFlowConfig, ServiceWorker
config = AgentFlowConfig(
api_base="https://your-platform.com",
api_timeout=30.0, # API 超时(秒)
retry_max=3, # 最大重试次数
retry_base_delay=1.0, # 重试基础延迟(秒)
retry_max_delay=30.0, # 重试最大延迟(秒)
circuit_failure_threshold=5, # 熔断器失败阈值
circuit_recovery_timeout=30.0, # 熔断恢复超时(秒)
rate_limit_rps=10.0, # 限流:每秒请求数
rate_limit_burst=20, # 限流:突发大小
heartbeat_interval=30.0, # 心跳间隔(秒)
poll_interval=5.0, # 拉取间隔(秒)
log_level="INFO", # 日志级别(DEBUG/INFO/WARNING/ERROR)
log_format="json", # 日志格式(json/text)
metrics_enabled=False, # 指标收集
metrics_port=9090,
state_persistence_enabled=False, # 状态持久化
state_storage_path="./agentflow_state"
)
worker = ServiceWorker(
node_id="your-node-id",
node_secret="your-node-secret",
config=config
)
环境变量配置
export AGENTFLOW_API_BASE=https://your-platform.com
export AGENTFLOW_API_TIMEOUT=30.0
export AGENTFLOW_RETRY_MAX=3
export AGENTFLOW_LOG_LEVEL=INFO
config = AgentFlowConfig.from_env()
JSON 文件配置
config = AgentFlowConfig.from_file("agentflow_config.json")
config.to_file("agentflow_config.json") # 保存
异常处理
SDK 提供层次化异常类型,便于精确错误处理。
异常层次
AgentFlowError (基类)
├── AuthenticationError # 认证失败
├── ConnectionError # 网络连接故障
├── RateLimitError # 请求频率过高
├── CircuitBreakerOpenError # 熔断器开启
├── TaskNotFoundError # 任务不存在
├── TaskStateError # 任务状态错误
├── ProtocolError # 协议格式错误
├── TimeoutError # 操作超时
├── ValidationError # 参数校验失败
├── ConfigurationError # 配置错误
├── QuotaError # 额度不足
├── FileDownloadError # 文件下载失败
└── ServiceError # 服务端错误
使用示例
from agentflow import RequesterClient
from agentflow.types.exceptions import (
QuotaError, TimeoutError, ServiceError, ConnectionError
)
client = RequesterClient(api_key="af_live_xxx")
try:
result = client.run_and_wait("svc-id", {"text": "hello"}, timeout=300)
except QuotaError as e:
print(f"额度不足: 需要{e.required}, 可用{e.available}")
except TimeoutError as e:
print(f"超时: {e.timeout}s")
except ConnectionError as e:
print(f"连接失败: {e.url}")
except ServiceError as e:
print(f"服务错误: HTTP {e.status_code} - {e.error_body}")
except AgentFlowError as e:
print(f"其他错误: {e}")
重试与熔断
指数退避重试
SDK 内置自动重试(可重试状态码:429、500、502、503、504):
config = AgentFlowConfig(
retry_max=3, # 最大重试次数
retry_base_delay=1.0, # 基础延迟(秒)
retry_max_delay=30.0 # 最大延迟(秒)
)
# 实际延迟: base * 2^attempt + jitter
# 第1次重试: ~1-2s, 第2次: ~2-4s, 第3次: ~4-8s
熔断器保护
连续失败达到阈值后自动熔断,避免雪崩:
config = AgentFlowConfig(
circuit_failure_threshold=5, # 连续 5 次失败触发熔断
circuit_recovery_timeout=30.0 # 30 秒后尝试恢复
)
熔断器状态:关闭 → 打开(拒绝请求)→ 半开(试探性恢复)→ 关闭
令牌桶限流
config = AgentFlowConfig(
rate_limit_rps=10.0, # 每秒 10 个请求
rate_limit_burst=20 # 突发最多 20 个
)
状态持久化
启用后 SDK 会将任务状态保存到磁盘,重启后可恢复未完成任务。
config = AgentFlowConfig(
state_persistence_enabled=True,
state_storage_path="./agentflow_state"
)
worker = ServiceWorker(node_id="...", node_secret="...", config=config)
worker.register_service("svc-1", handler)
# 启动前恢复未完成任务
recovered = worker.recover_incomplete_tasks()
print(f"恢复了 {recovered} 个未完成任务")
worker.start()
文件下载
处理需求方上传的文件:
def file_handler(input_data: dict, ctx: TaskContext) -> dict:
file_url = input_data.get("file_url")
if file_url:
# 下载到本地
local_path = ctx._client.download_file(
file_url,
local_path="./workspace/input.pdf",
timeout=60
)
try:
result = process_file(local_path)
finally:
import os
if os.path.exists(local_path):
os.remove(local_path)
return {"machine_data": result, "ui_content": [...]}
上下文管理器
SDK 支持 with 语句,自动管理资源生命周期:
# 离开 with 块时自动关闭 HTTP 连接
with ServiceWorker(node_id, node_secret) as worker:
worker.register_service("svc-1", handler)
worker.start()
心跳机制
节点通过启停信号管理上线/下线状态:
- 启动时:自动发送上线信号,关联服务状态变为"在线"
- 运行时:SDK 定期发送心跳(默认 30s),保持在线
- 停止时:自动发送下线信号,关联服务状态变为"离线"
- 离线判定:超过 3 分钟无心跳,平台自动标记为离线
指标收集
config = AgentFlowConfig(
metrics_enabled=True,
metrics_port=9090
)
worker = ServiceWorker(node_id, node_secret, config=config)
# 指标可通过 http://localhost:9090 访问
# 代码中获取
metrics = worker.get_metrics()
stats = worker.get_http_stats()
最佳实践
错误处理
def robust_handler(input_data: dict, ctx: TaskContext) -> dict:
try:
result = your_logic(input_data)
return {"machine_data": result, "ui_content": [...]}
except Exception as e:
# 友好错误信息,SDK 会调用 fail 接口退还额度
return {
"machine_data": {"error": str(e)},
"ui_content": [{"type": "markdown", "content": f"## 处理失败
{str(e)}"}]
}
环境变量管理凭证
import os
worker = ServiceWorker(
node_id=os.getenv("AGENTFLOW_NODE_ID"),
node_secret=os.getenv("AGENTFLOW_NODE_SECRET"),
api_base=os.getenv("AGENTFLOW_API_BASE", "https://your-platform.com")
)
日志调试
config = AgentFlowConfig(log_level="DEBUG", log_format="text")
# 或环境变量: LOG_LEVEL=debug
常见问题
Q: 节点离线? 检查 SDK 是否运行、网络是否正常。重启 SDK 会自动上线。
Q: 多机器共享节点? 每台机器创建独立节点,不要共用凭证。
Q: 更新处理函数? 修改代码重启 Worker 即可,正在执行的任务不受影响。
Q: 处理函数抛异常? SDK 会捕获并自动调用 fail 接口,额度返还需求方。