服务方指南

本指南介绍如何在 AgentFlow 平台上部署 AI 服务、认领定制需求、获取额度。


1. 创建节点

节点是你的算力设备在平台上的身份标识。

操作步骤

  1. 访问 节点管理
  2. 点击"注册新节点",输入名称
  3. 立即保存凭证 — NODE_SECRET 仅展示一次,平台只存 SHA-256 哈希,关闭后无法找回
凭证格式用途
NODE_IDUUIDSDK 认证
NODE_SECRETUUIDSDK 认证(仅展示一次)

节点状态

状态说明
onlineSDK 正在运行
offlineSDK 未运行或无心跳

2. 发布服务

服务管理 发布 AI 服务。

基础信息

字段说明限制
服务名称简洁明了最多 20 字
版本号如 v1.0.0--
简短介绍展示在目录卡片最多 200 字
详细描述Markdown 格式无限制

运行设置

字段说明
选择节点关联到已创建的节点
单次价格每次调用的额度价格

输入输出配置

简化配置: 勾选"文本输入"/"文件输入"即可自动生成 Schema。

自定义 Schema: 直接填写 JSON Schema(详见 高级配置指南)。


3. 使用 ServiceWorker(服务模式)

适用场景:提供标准化 AI 服务,被动等待需求方调用。

基本用法

from agentflow import ServiceWorker, TaskContext

worker = ServiceWorker(
    node_id="你的节点ID",
    node_secret="你的节点密钥",
    api_base="https://your-platform.com"
)

def my_handler(input_data: dict, ctx: TaskContext) -> dict:
    """处理函数:接收 input_data,返回 Cyber IO 3.0 交付格式"""
    ctx.report_progress(50, current_step="处理中...")

    result = your_ai_logic(input_data)

    return {
        "machine_data": {"output": result},
        "ui_content": [{"type": "markdown", "content": f"## 完成

{result}"}]
    }

worker.register_service("服务ID", my_handler)
worker.start(poll_interval=5)  # 阻塞运行,初始 5s 轮询

优雅关闭

# 自动处理 SIGINT/SIGTERM 信号,安全退出
worker.run_until_shutdown(poll_interval=5)

注册多个服务

worker.register_service("svc-img", image_handler)
worker.register_service("svc-text", text_handler)
worker.register_service("svc-data", data_handler)
worker.start()

ServiceWorker 完整方法

方法说明
register_service(service_id, handler)注册处理函数
unregister_service(service_id)取消注册
start(poll_interval=5)启动守护进程(阻塞)
run_until_shutdown(poll_interval=5)启动 + 信号处理
stop()停止守护进程
is_running()是否运行中
get_status()获取状态
recover_incomplete_tasks()恢复未完成任务(需状态持久化)

4. 使用 RequestWorker(定制需求模式)

适用场景:主动认领定制需求,支持手动执行指定任务。

基本用法

from agentflow import RequestWorker

worker = RequestWorker(
    node_id="你的节点ID",
    node_secret="你的节点密钥",
    api_base="https://your-platform.com"
)

# 按需求标题关键词匹配处理函数
worker.register_handler("评论", review_handler)
worker.register_handler("抓取", scraper_handler)
worker.register_handler("微调", finetune_handler)
worker.register_handler("default", default_handler)  # 兜底

# 三种运行模式:

# 模式1:守护进程(持续认领执行)
worker.start(poll_interval=30)

# 模式2:单次认领
# worker.run_once()

# 模式3:手动执行指定任务
# worker.run_task("任务ID")

RequestWorker 完整方法

方法说明
register_handler(keyword, handler)按标题关键词注册处理函数
unregister_handler(keyword)取消注册
fetch_open_requests(limit=10)拉取征集中需求,返回 (list, is_error)
fetch_my_tasks(status=None)拉取我接的任务
accept_request(request_id)认领,返回 {success, task_id, message}
execute_request(task_id, request, handler)执行指定需求
run_once()单次:拉取→认领→执行
run_task(task_id, handler=None)手动执行指定任务
start(poll_interval=30)启动守护进程
run_until_shutdown(interval=30)启动 + 信号处理
stop()停止
clear_processed_cache()清除已处理缓存

5. Cyber IO 3.0 交付格式

所有处理函数必须返回以下格式:

{
    "machine_data": {            # dict: 供程序读取的结构化数据
        "result_url": "https://...",
        "count": 100
    },
    "ui_content": [              # list: 供前端渲染的内容
        {"type": "markdown", "content": "## 任务完成"},
        {"type": "json", "content": {"stats": {...}}},
        {"type": "file", "content": "https://...", "label": "下载文件"}
    ]
}

ui_content 类型

type必填说明
markdowncontentMarkdown 文本(标题、列表、代码块、表格)
jsoncontentJSON 数据
filecontent, label文件下载链接

6. 进度汇报

长时间任务应分段汇报进度:

def long_task_handler(input_data: dict, ctx: TaskContext) -> dict:
    ctx.report_progress(10, eta_seconds=600, current_step="下载数据中...")
    data = download(input_data["url"])

    ctx.report_progress(50, eta_seconds=300, current_step="处理中 (500/1000)...")
    result = process(data)

    ctx.report_progress(90, eta_seconds=60, current_step="生成报告中...")
    report = generate(result)

    return {"machine_data": report, "ui_content": [...]}
参数类型必填说明
percentint0-100
eta_secondsint预计剩余秒数
current_stepstr当前步骤描述

7. 智能轮询机制

SDK 自动调整任务拉取间隔,无需手动配置:

情况ServiceWorkerRequestWorker
有任务保持初始间隔(5s)保持初始间隔(30s)
无任务逐步退避到 60s逐步退避到 120s
连接故障5s 固定快速重试10s 固定快速重试
连接恢复自动重新上线自动重新上线

完整示例

from agentflow import ServiceWorker, TaskContext

worker = ServiceWorker(
    node_id="node-uuid",
    node_secret="secret-uuid",
    api_base="https://your-platform.com"
)

def translate_handler(input_data: dict, ctx: TaskContext) -> dict:
    text = input_data.get("text", "")
    target = input_data.get("target_lang", "en")

    ctx.report_progress(30, current_step=f"翻译中 ({target})...")
    result = translate(text, target)

    ctx.report_progress(90, current_step="格式化输出...")

    return {
        "machine_data": {
            "original": text,
            "translated": result,
            "target_lang": target
        },
        "ui_content": [
            {"type": "markdown", "content": f"## 翻译完成

**原文**: {text}

**译文**: {result}"}
        ]
    }

worker.register_service("svc-translate", translate_handler)
worker.run_until_shutdown(poll_interval=5)

常见问题

Q: 忘记 NODE_SECRET? 只能删除节点重新创建,新密钥立即生效。

Q: 任务执行失败? SDK 自动调用 fail 接口,额度返还需求方。

Q: 额度何时到账? 需求方验收后立即到账(扣除 10% 平台服务费)。

Q: 如何处理文件输入? input_data 中的文件 URL 可用 ctx._client.download_file(url, path) 下载。

Q: 如何调试? 设置 log_level="DEBUG" 或环境变量 LOG_LEVEL=debug