节点接入指南

本指南介绍 AgentFlow SDK 的深入使用方法,包括配置管理、异常处理、容灾机制。


前置要求

软件版本
Python3.8+
pip最新版
pip install agentflow-sdk

配置管理(AgentFlowConfig)

SDK 通过 AgentFlowConfig 统一管理所有配置。

代码配置

from agentflow import AgentFlowConfig, ServiceWorker

config = AgentFlowConfig(
    api_base="https://your-platform.com",
    api_timeout=30.0,            # API 超时(秒)
    retry_max=3,                 # 最大重试次数
    retry_base_delay=1.0,        # 重试基础延迟(秒)
    retry_max_delay=30.0,        # 重试最大延迟(秒)
    circuit_failure_threshold=5, # 熔断器失败阈值
    circuit_recovery_timeout=30.0, # 熔断恢复超时(秒)
    rate_limit_rps=10.0,         # 限流:每秒请求数
    rate_limit_burst=20,         # 限流:突发大小
    heartbeat_interval=30.0,     # 心跳间隔(秒)
    poll_interval=5.0,           # 拉取间隔(秒)
    log_level="INFO",            # 日志级别(DEBUG/INFO/WARNING/ERROR)
    log_format="json",           # 日志格式(json/text)
    metrics_enabled=False,       # 指标收集
    metrics_port=9090,
    state_persistence_enabled=False,  # 状态持久化
    state_storage_path="./agentflow_state"
)

worker = ServiceWorker(
    node_id="your-node-id",
    node_secret="your-node-secret",
    config=config
)

环境变量配置

export AGENTFLOW_API_BASE=https://your-platform.com
export AGENTFLOW_API_TIMEOUT=30.0
export AGENTFLOW_RETRY_MAX=3
export AGENTFLOW_LOG_LEVEL=INFO
config = AgentFlowConfig.from_env()

JSON 文件配置

config = AgentFlowConfig.from_file("agentflow_config.json")
config.to_file("agentflow_config.json")  # 保存

异常处理

SDK 提供层次化异常类型,便于精确错误处理。

异常层次

AgentFlowError (基类)
├── AuthenticationError   # 认证失败
├── ConnectionError       # 网络连接故障
├── RateLimitError        # 请求频率过高
├── CircuitBreakerOpenError # 熔断器开启
├── TaskNotFoundError     # 任务不存在
├── TaskStateError        # 任务状态错误
├── ProtocolError         # 协议格式错误
├── TimeoutError          # 操作超时
├── ValidationError       # 参数校验失败
├── ConfigurationError    # 配置错误
├── QuotaError            # 额度不足
├── FileDownloadError     # 文件下载失败
└── ServiceError          # 服务端错误

使用示例

from agentflow import RequesterClient
from agentflow.types.exceptions import (
    QuotaError, TimeoutError, ServiceError, ConnectionError
)

client = RequesterClient(api_key="af_live_xxx")

try:
    result = client.run_and_wait("svc-id", {"text": "hello"}, timeout=300)
except QuotaError as e:
    print(f"额度不足: 需要{e.required}, 可用{e.available}")
except TimeoutError as e:
    print(f"超时: {e.timeout}s")
except ConnectionError as e:
    print(f"连接失败: {e.url}")
except ServiceError as e:
    print(f"服务错误: HTTP {e.status_code} - {e.error_body}")
except AgentFlowError as e:
    print(f"其他错误: {e}")

重试与熔断

指数退避重试

SDK 内置自动重试(可重试状态码:429、500、502、503、504):

config = AgentFlowConfig(
    retry_max=3,           # 最大重试次数
    retry_base_delay=1.0,  # 基础延迟(秒)
    retry_max_delay=30.0   # 最大延迟(秒)
)
# 实际延迟: base * 2^attempt + jitter
# 第1次重试: ~1-2s, 第2次: ~2-4s, 第3次: ~4-8s

熔断器保护

连续失败达到阈值后自动熔断,避免雪崩:

config = AgentFlowConfig(
    circuit_failure_threshold=5,   # 连续 5 次失败触发熔断
    circuit_recovery_timeout=30.0  # 30 秒后尝试恢复
)

熔断器状态:关闭 → 打开(拒绝请求)→ 半开(试探性恢复)→ 关闭

令牌桶限流

config = AgentFlowConfig(
    rate_limit_rps=10.0,   # 每秒 10 个请求
    rate_limit_burst=20    # 突发最多 20 个
)

状态持久化

启用后 SDK 会将任务状态保存到磁盘,重启后可恢复未完成任务。

config = AgentFlowConfig(
    state_persistence_enabled=True,
    state_storage_path="./agentflow_state"
)

worker = ServiceWorker(node_id="...", node_secret="...", config=config)
worker.register_service("svc-1", handler)

# 启动前恢复未完成任务
recovered = worker.recover_incomplete_tasks()
print(f"恢复了 {recovered} 个未完成任务")

worker.start()

文件下载

处理需求方上传的文件:

def file_handler(input_data: dict, ctx: TaskContext) -> dict:
    file_url = input_data.get("file_url")
    if file_url:
        # 下载到本地
        local_path = ctx._client.download_file(
            file_url,
            local_path="./workspace/input.pdf",
            timeout=60
        )
        try:
            result = process_file(local_path)
        finally:
            import os
            if os.path.exists(local_path):
                os.remove(local_path)

    return {"machine_data": result, "ui_content": [...]}

上下文管理器

SDK 支持 with 语句,自动管理资源生命周期:

# 离开 with 块时自动关闭 HTTP 连接
with ServiceWorker(node_id, node_secret) as worker:
    worker.register_service("svc-1", handler)
    worker.start()

心跳机制

节点通过启停信号管理上线/下线状态:

  • 启动时:自动发送上线信号,关联服务状态变为"在线"
  • 运行时:SDK 定期发送心跳(默认 30s),保持在线
  • 停止时:自动发送下线信号,关联服务状态变为"离线"
  • 离线判定:超过 3 分钟无心跳,平台自动标记为离线

指标收集

config = AgentFlowConfig(
    metrics_enabled=True,
    metrics_port=9090
)

worker = ServiceWorker(node_id, node_secret, config=config)
# 指标可通过 http://localhost:9090 访问

# 代码中获取
metrics = worker.get_metrics()
stats = worker.get_http_stats()

最佳实践

错误处理

def robust_handler(input_data: dict, ctx: TaskContext) -> dict:
    try:
        result = your_logic(input_data)
        return {"machine_data": result, "ui_content": [...]}
    except Exception as e:
        # 友好错误信息,SDK 会调用 fail 接口退还额度
        return {
            "machine_data": {"error": str(e)},
            "ui_content": [{"type": "markdown", "content": f"## 处理失败

{str(e)}"}]
        }

环境变量管理凭证

import os
worker = ServiceWorker(
    node_id=os.getenv("AGENTFLOW_NODE_ID"),
    node_secret=os.getenv("AGENTFLOW_NODE_SECRET"),
    api_base=os.getenv("AGENTFLOW_API_BASE", "https://your-platform.com")
)

日志调试

config = AgentFlowConfig(log_level="DEBUG", log_format="text")
# 或环境变量: LOG_LEVEL=debug

常见问题

Q: 节点离线? 检查 SDK 是否运行、网络是否正常。重启 SDK 会自动上线。

Q: 多机器共享节点? 每台机器创建独立节点,不要共用凭证。

Q: 更新处理函数? 修改代码重启 Worker 即可,正在执行的任务不受影响。

Q: 处理函数抛异常? SDK 会捕获并自动调用 fail 接口,额度返还需求方。