服务方指南
本指南介绍如何在 AgentFlow 平台上部署 AI 服务、认领定制需求、获取额度。
1. 创建节点
节点是你的算力设备在平台上的身份标识。
操作步骤
- 访问 节点管理
- 点击"注册新节点",输入名称
- 立即保存凭证 — NODE_SECRET 仅展示一次,平台只存 SHA-256 哈希,关闭后无法找回
| 凭证 | 格式 | 用途 |
|---|---|---|
| NODE_ID | UUID | SDK 认证 |
| NODE_SECRET | UUID | SDK 认证(仅展示一次) |
节点状态
| 状态 | 说明 |
|---|---|
| online | SDK 正在运行 |
| offline | SDK 未运行或无心跳 |
2. 发布服务
在 服务管理 发布 AI 服务。
基础信息
| 字段 | 说明 | 限制 |
|---|---|---|
| 服务名称 | 简洁明了 | 最多 20 字 |
| 版本号 | 如 v1.0.0 | -- |
| 简短介绍 | 展示在目录卡片 | 最多 200 字 |
| 详细描述 | Markdown 格式 | 无限制 |
运行设置
| 字段 | 说明 |
|---|---|
| 选择节点 | 关联到已创建的节点 |
| 单次价格 | 每次调用的额度价格 |
输入输出配置
简化配置: 勾选"文本输入"/"文件输入"即可自动生成 Schema。
自定义 Schema: 直接填写 JSON Schema(详见 高级配置指南)。
3. 使用 ServiceWorker(服务模式)
适用场景:提供标准化 AI 服务,被动等待需求方调用。
基本用法
from agentflow import ServiceWorker, TaskContext
worker = ServiceWorker(
node_id="你的节点ID",
node_secret="你的节点密钥",
api_base="https://your-platform.com"
)
def my_handler(input_data: dict, ctx: TaskContext) -> dict:
"""处理函数:接收 input_data,返回 Cyber IO 3.0 交付格式"""
ctx.report_progress(50, current_step="处理中...")
result = your_ai_logic(input_data)
return {
"machine_data": {"output": result},
"ui_content": [{"type": "markdown", "content": f"## 完成
{result}"}]
}
worker.register_service("服务ID", my_handler)
worker.start(poll_interval=5) # 阻塞运行,初始 5s 轮询
优雅关闭
# 自动处理 SIGINT/SIGTERM 信号,安全退出
worker.run_until_shutdown(poll_interval=5)
注册多个服务
worker.register_service("svc-img", image_handler)
worker.register_service("svc-text", text_handler)
worker.register_service("svc-data", data_handler)
worker.start()
ServiceWorker 完整方法
| 方法 | 说明 |
|---|---|
register_service(service_id, handler) | 注册处理函数 |
unregister_service(service_id) | 取消注册 |
start(poll_interval=5) | 启动守护进程(阻塞) |
run_until_shutdown(poll_interval=5) | 启动 + 信号处理 |
stop() | 停止守护进程 |
is_running() | 是否运行中 |
get_status() | 获取状态 |
recover_incomplete_tasks() | 恢复未完成任务(需状态持久化) |
4. 使用 RequestWorker(定制需求模式)
适用场景:主动认领定制需求,支持手动执行指定任务。
基本用法
from agentflow import RequestWorker
worker = RequestWorker(
node_id="你的节点ID",
node_secret="你的节点密钥",
api_base="https://your-platform.com"
)
# 按需求标题关键词匹配处理函数
worker.register_handler("评论", review_handler)
worker.register_handler("抓取", scraper_handler)
worker.register_handler("微调", finetune_handler)
worker.register_handler("default", default_handler) # 兜底
# 三种运行模式:
# 模式1:守护进程(持续认领执行)
worker.start(poll_interval=30)
# 模式2:单次认领
# worker.run_once()
# 模式3:手动执行指定任务
# worker.run_task("任务ID")
RequestWorker 完整方法
| 方法 | 说明 |
|---|---|
register_handler(keyword, handler) | 按标题关键词注册处理函数 |
unregister_handler(keyword) | 取消注册 |
fetch_open_requests(limit=10) | 拉取征集中需求,返回 (list, is_error) |
fetch_my_tasks(status=None) | 拉取我接的任务 |
accept_request(request_id) | 认领,返回 {success, task_id, message} |
execute_request(task_id, request, handler) | 执行指定需求 |
run_once() | 单次:拉取→认领→执行 |
run_task(task_id, handler=None) | 手动执行指定任务 |
start(poll_interval=30) | 启动守护进程 |
run_until_shutdown(interval=30) | 启动 + 信号处理 |
stop() | 停止 |
clear_processed_cache() | 清除已处理缓存 |
5. Cyber IO 3.0 交付格式
所有处理函数必须返回以下格式:
{
"machine_data": { # dict: 供程序读取的结构化数据
"result_url": "https://...",
"count": 100
},
"ui_content": [ # list: 供前端渲染的内容
{"type": "markdown", "content": "## 任务完成"},
{"type": "json", "content": {"stats": {...}}},
{"type": "file", "content": "https://...", "label": "下载文件"}
]
}
ui_content 类型
| type | 必填 | 说明 |
|---|---|---|
markdown | content | Markdown 文本(标题、列表、代码块、表格) |
json | content | JSON 数据 |
file | content, label | 文件下载链接 |
6. 进度汇报
长时间任务应分段汇报进度:
def long_task_handler(input_data: dict, ctx: TaskContext) -> dict:
ctx.report_progress(10, eta_seconds=600, current_step="下载数据中...")
data = download(input_data["url"])
ctx.report_progress(50, eta_seconds=300, current_step="处理中 (500/1000)...")
result = process(data)
ctx.report_progress(90, eta_seconds=60, current_step="生成报告中...")
report = generate(result)
return {"machine_data": report, "ui_content": [...]}
| 参数 | 类型 | 必填 | 说明 |
|---|---|---|---|
percent | int | 是 | 0-100 |
eta_seconds | int | 否 | 预计剩余秒数 |
current_step | str | 否 | 当前步骤描述 |
7. 智能轮询机制
SDK 自动调整任务拉取间隔,无需手动配置:
| 情况 | ServiceWorker | RequestWorker |
|---|---|---|
| 有任务 | 保持初始间隔(5s) | 保持初始间隔(30s) |
| 无任务 | 逐步退避到 60s | 逐步退避到 120s |
| 连接故障 | 5s 固定快速重试 | 10s 固定快速重试 |
| 连接恢复 | 自动重新上线 | 自动重新上线 |
完整示例
from agentflow import ServiceWorker, TaskContext
worker = ServiceWorker(
node_id="node-uuid",
node_secret="secret-uuid",
api_base="https://your-platform.com"
)
def translate_handler(input_data: dict, ctx: TaskContext) -> dict:
text = input_data.get("text", "")
target = input_data.get("target_lang", "en")
ctx.report_progress(30, current_step=f"翻译中 ({target})...")
result = translate(text, target)
ctx.report_progress(90, current_step="格式化输出...")
return {
"machine_data": {
"original": text,
"translated": result,
"target_lang": target
},
"ui_content": [
{"type": "markdown", "content": f"## 翻译完成
**原文**: {text}
**译文**: {result}"}
]
}
worker.register_service("svc-translate", translate_handler)
worker.run_until_shutdown(poll_interval=5)
常见问题
Q: 忘记 NODE_SECRET? 只能删除节点重新创建,新密钥立即生效。
Q: 任务执行失败? SDK 自动调用 fail 接口,额度返还需求方。
Q: 额度何时到账? 需求方验收后立即到账(扣除 10% 平台服务费)。
Q: 如何处理文件输入? input_data 中的文件 URL 可用 ctx._client.download_file(url, path) 下载。
Q: 如何调试? 设置 log_level="DEBUG" 或环境变量 LOG_LEVEL=debug。