卖家指南
本指南帮助你在 AgentFlow 平台上出售算力服务、接单悬赏、赚取收益。
目录
1. 创建节点
节点是你的算力设备在平台上的身份标识。所有服务都必须关联到节点才能运行。
1.1 创建步骤
- 访问 节点管理 页面
- 点击"注册新节点"
- 输入节点名称(如"我的客厅 4090 主机")
- 点击确认
1.2 保存凭证
创建成功后,系统会生成:
| 凭证 | 说明 | 用途 |
|---|---|---|
| NODE_ID | 节点唯一标识 | SDK 认证、API 调用 |
| NODE_SECRET | 节点鉴权密钥 | SDK 认证、API 调用 |
⚠️ 重要提示: NODE_SECRET 只显示一次,请务必保存!
凭证格式示例:
NODE_ID: 550e8400-e29b-41d4-a716-446655440000
NODE_SECRET: 7c9e6679-7425-40de-944b-e07fc1f90ae7
1.3 节点管理
在节点管理页面可以:
| 操作 | 说明 |
|---|---|
| 查看列表 | 查看节点列表和承载的服务数量 |
| 复制凭证 | 复制 NODE_ID 和 NODE_SECRET |
| 编辑名称 | 修改节点显示名称 |
| 删除节点 | 删除节点(会同步删除该节点下的所有服务) |
节点状态:
| 状态 | 说明 |
|---|---|
| online | 节点在线,SDK 正在运行 |
| offline | 节点离线,SDK 未运行 |
| maintenance | 维护中(预留) |
2. 发布服务
在 服务管理 页面发布你的 AI 服务。
2.1 基础信息
| 字段 | 说明 | 限制 |
|---|---|---|
| 服务名称 | 简洁明了的服务名称 | 最多 20 字 |
| 版本号 | 如 v1.0.0 | 建议格式 v1.0.0 |
| 简短介绍 | 展示在市场卡片上 | 最多 200 字 |
| 详细描述 | 支持 Markdown 的详细说明 | 无限制 |
详细描述建议:
- 功能介绍
- 使用场景
- 输入输出说明
- 注意事项
2.2 运行设置
| 字段 | 说明 |
|---|---|
| 选择节点 | 运行此服务的节点(必须关联) |
| 单次价格 | 每次调用的积分价格 |
价格建议:
- 参考市场同类服务定价
- 考虑算力成本和时间成本
- 新服务可以设置较低价格吸引买家
2.3 输入输出配置
简化配置
| 配置项 | 说明 |
|---|---|
| 文本输入 | 是否接受文本输入 |
| 字数限制 | 文本最大字符数 |
| 文件输入 | 是否接受文件上传 |
| 文件格式 | 允许的文件类型(逗号分隔) |
| 文件大小 | 最大文件大小(MB) |
高级配置(JSON Schema)
对于复杂需求,可直接填写自定义 Schema:
// Input Schema - 买家需要提供的参数
{
"type": "object",
"required": ["image_url"],
"properties": {
"image_url": {
"type": "string",
"format": "uri",
"description": "待处理的图片URL"
},
"style": {
"type": "string",
"enum": ["cartoon", "oil_painting", "sketch"],
"default": "cartoon",
"description": "转换风格"
}
}
}
// Output Schema - 服务返回的数据结构
{
"type": "object",
"properties": {
"result_url": {
"type": "string",
"format": "uri",
"description": "处理后的图片URL"
}
}
}
详细说明请参考 高级配置指南。
2.4 服务上线
发布服务后,需要运行 SDK 节点才能接收订单:
- 保存服务 ID(在服务管理页面查看)
- 使用 SDK 运行节点(参考下方 SDK 使用指南)
- 节点心跳上报后,服务状态变为"在线"
服务状态判定:
- 节点启动时发送上线信号 → 服务状态变为"在线"
- 超过 3 分钟无心跳 → 服务状态变为"离线"
- 节点停止时发送下线信号 → 服务状态变为"离线"
2.5 管理服务
在服务管理页面可以:
| 操作 | 说明 |
|---|---|
| 查看列表 | 查看服务列表和状态 |
| 编辑服务 | 修改服务配置 |
| 删除服务 | 永久删除服务 |
| 查看数据 | 查看销量、收藏、收益 |
3. 接单悬赏
在 悬赏大厅 浏览买家发布的需求。
3.1 浏览悬赏
悬赏大厅展示所有招募中的悬赏:
| 信息 | 说明 |
|---|---|
| 标题 | 悬赏标题 |
| 简短介绍 | 需求概述 |
| 悬赏金额 | 完成后可获得的积分 |
| 违约金 | 超时未交付时扣除的积分 |
| 交付时限 | 必须在多少小时内完成 |
筛选方式:
- 状态筛选:招募中 / 已完结
- 关键词搜索
3.2 查看详情
点击悬赏卡片,右侧弹出 Sheet 侧边栏展示:
| 区域 | 内容 |
|---|---|
| 基础信息 | 标题、悬赏金额、违约金、交付时限 |
| 详细描述 | Markdown 渲染的需求描述 |
| 输入输出 | 期望的输入输出格式 |
| 发布者 | 买家信息 |
3.3 抢单接单
- 点击"立即抢单"按钮
- 系统创建任务并锁定悬赏金额
- 显示任务凭证(Task ID)
- 保存 Task ID 用于后续交付
抢单规则:
- 不能接自己发布的悬赏
- 先到先得,同一时间只能有一个卖家接单
- 接单后必须在交付时限内完成
3.4 开发交付
接单成功后,使用 SDK 或手动交付成果:
使用 SDK 交付
from agentflow import BountyWorker
worker = BountyWorker(
node_id="your-node-id",
node_secret="your-node-secret"
)
# 注册处理函数
worker.register_handler("评论", my_handler)
# 手动执行指定任务
worker.run_task("tsk_xxx-xxx-xxx")
手动交付
通过 API 直接交付:
curl -X POST https://agentflow.com/api/node/tasks/TASK_ID/deliver \
-H "X-Node-ID: YOUR_NODE_ID" \
-H "Authorization: Bearer YOUR_NODE_SECRET" \
-H "Content-Type: application/json" \
-d '{
"machine_data": {"result": "..."},
"ui_content": [{"type": "markdown", "content": "完成"}]
}'
3.5 等待验收
买家验收后,悬赏金额(扣除平台手续费)打入你的钱包。
收益计算:
- 悬赏金额 + 违约金 = 总收益
- 平台手续费:10%
- 实际到账:总收益 × 90%
4. 管理订单
在 任务交付 页面查看所有接到的订单。
4.1 订单类型
| 类型 | 标识 | 说明 |
|---|---|---|
| 服务订单 | 无 | 买家购买你的服务产生 |
| 悬赏订单 | 紫色"悬赏"标签 | 你接单的悬赏任务 |
4.2 订单详情
点击"详情"查看:
| 信息 | 说明 |
|---|---|
| Task ID | 任务凭证,用于 SDK 交付 |
| 执行进度 | 实时进度条 |
| 输入数据 | 买家提供的输入参数 |
| 输出格式 | 期望的输出格式(悬赏订单) |
| 剩余时间 | 悬赏订单显示交付倒计时 |
| 财务信息 | 锁定积分、卖家收入 |
4.3 订单状态
| 状态 | 说明 | 操作 |
|---|---|---|
| 待处理 | 等待节点拉取任务 | SDK 自动拉取 |
| 执行中 | 正在处理 | 汇报进度 |
| 待验收 | 已交付 | 等待买家验收 |
| 已完成 | 验收通过 | 积分到账 |
| 已失败 | 执行失败 | 查看错误日志 |
5. SDK 使用指南
安装 SDK
pip install agentflow
SDK v3.0 特性
| 特性 | 说明 |
|---|---|
| 同步/异步双模式 | 支持 ServiceWorker 和 AsyncServiceWorker |
| 配置管理 | AgentFlowConfig 支持环境变量和 JSON 配置 |
| 结构化日志 | Python logging 模块,JSON 格式输出 |
| 异常层次结构 | 清晰的异常类型,便于错误处理 |
| 上下文管理器 | with / async with 自动资源清理 |
| 重试机制 | 指数退避重试,熔断器保护 |
| 优雅关闭 | 跨平台信号处理,安全退出 |
| 智能轮询 | 动态调整轮询间隔,降低 API 负载 |
同步 vs 异步:如何选择?
| 场景 | 推荐模式 | 原因 |
|---|---|---|
| 简单脚本、单任务 | 同步模式 | 代码简单,易于调试 |
| 高并发、多任务 | 异步模式 | 高性能,资源利用率高 |
| 需要同时处理多个请求 | 异步模式 | 避免阻塞等待 |
| 快速原型开发 | 同步模式 | 开发效率高 |
服务模式 (ServiceWorker)
适用于:持续监听服务订单,自动执行。
同步版本(推荐入门)
from agentflow import ServiceWorker, AgentFlowConfig
# 配置节点凭证(从节点管理页面获取)
NODE_ID = "your-node-id"
NODE_SECRET = "your-node-secret"
API_BASE = "https://agentflow.com"
# SDK v3.0 配置管理
config = AgentFlowConfig(
api_base=API_BASE,
api_timeout=30.0,
retry_max=3,
log_level="INFO"
)
# 创建 Worker(使用上下文管理器)
with ServiceWorker(
node_id=NODE_ID,
node_secret=NODE_SECRET,
config=config
) as worker:
# 定义处理函数
def my_handler(input_data: dict, ctx) -> dict:
"""
处理服务订单
Args:
input_data: 买家输入的参数
ctx: 任务上下文,用于进度汇报
Returns:
Cyber IO 3.0 格式的交付物
"""
# 汇报进度
ctx.report_progress(
percent=50,
eta_seconds=60,
current_step="正在处理..."
)
# 处理逻辑
result = process(input_data)
# 返回结果(Cyber IO 3.0 格式)
return {
"machine_data": {
"result_url": result.url
},
"ui_content": [
{
"type": "markdown",
"content": f"## 处理完成\n\n结果: {result.url}"
},
{
"type": "file",
"content": result.url,
"label": "结果文件"
}
]
}
# 注册服务
SERVICE_ID = "your-service-id"
worker.register_service(SERVICE_ID, my_handler)
# 启动守护进程(阻塞主线程)
worker.start(
poll_interval=5 # 初始任务拉取间隔(秒),智能退避最大 60s
)
异步版本(高并发场景)
import asyncio
from agentflow.async_ import AsyncServiceWorker
from agentflow.config.settings import AgentFlowConfig
NODE_ID = "your-node-id"
NODE_SECRET = "your-node-secret"
API_BASE = "https://agentflow.com"
config = AgentFlowConfig(
api_base=API_BASE,
api_timeout=30.0,
retry_max=3,
log_level="INFO"
)
async def my_handler(input_data: dict, ctx) -> dict:
"""异步处理函数"""
# 汇报进度
await ctx.report_progress(
percent=50,
eta_seconds=60,
current_step="正在处理..."
)
# 异步处理逻辑
result = await async_process(input_data)
return {
"machine_data": {"result_url": result.url},
"ui_content": [
{"type": "markdown", "content": "## 处理完成"},
{"type": "file", "content": result.url, "label": "结果文件"}
]
}
async def main():
# 使用异步上下文管理器
async with AsyncServiceWorker(
node_id=NODE_ID,
node_secret=NODE_SECRET,
config=config
) as worker:
# 注册服务
worker.register_service("service-id", my_handler)
# 启动守护进程
await worker.start(poll_interval=5)
if __name__ == "__main__":
asyncio.run(main())
智能轮询机制 v3.2
SDK v3.2 引入智能轮询,动态调整轮询间隔:
| 情况 | 行为 |
|---|---|
| 有任务时 | 保持初始高频轮询(如 5 秒) |
| 无任务时 | 逐步退避(5s → 10s → 20s → 30s → 60s) |
| 退避后遇到任务 | 立即恢复高频轮询 |
优势: API 负载降低约 90%
悬赏模式 (BountyWorker)
适用于:主动抢单、手动执行悬赏任务。
同步版本
from agentflow import BountyWorker, AgentFlowConfig
NODE_ID = "your-node-id"
NODE_SECRET = "your-node-secret"
API_BASE = "https://agentflow.com"
config = AgentFlowConfig(api_base=API_BASE, log_level="INFO")
with BountyWorker(
node_id=NODE_ID,
node_secret=NODE_SECRET,
config=config
) as worker:
# 注册处理函数(根据悬赏标题关键词匹配)
worker.register_handler("评论", review_scraper_handler)
worker.register_handler("抓取", scraper_handler)
worker.register_handler("微调", fine_tuning_handler)
worker.register_handler("default", default_handler) # 默认处理函数
# 运行模式 1:守护进程模式(自动抢单)
worker.start(poll_interval=30) # 初始间隔 30s,智能退避最大 120s
# 运行模式 2:单次抢单模式
# worker.run_once()
# 运行模式 3:手动执行指定任务
# success = worker.run_task("tsk_xxx-xxx-xxx")
异步版本
import asyncio
from agentflow.async_ import AsyncBountyWorker
from agentflow.config.settings import AgentFlowConfig
NODE_ID = "your-node-id"
NODE_SECRET = "your-node-secret"
API_BASE = "https://agentflow.com"
config = AgentFlowConfig(api_base=API_BASE, log_level="INFO")
async def main():
async with AsyncBountyWorker(
node_id=NODE_ID,
node_secret=NODE_SECRET,
config=config
) as worker:
# 注册处理函数
worker.register_handler("评论", async_review_handler)
worker.register_handler("抓取", async_scraper_handler)
# 守护进程模式
await worker.start(poll_interval=30)
if __name__ == "__main__":
asyncio.run(main())
查看我的任务
# 拉取我接的所有悬赏任务
tasks = worker.fetch_my_tasks()
# 按状态过滤
processing_tasks = worker.fetch_my_tasks(status="processing")
输出示例:
============================================================
📋 我的悬赏任务 (3 个)
============================================================
🆔 tsk_8f9a2b4c-1d3e... | 京东商品评论抓取... | processing
🆔 tsk_7c8d9e0f-1a2b... | LLaMA 3 微调服务... | delivered
============================================================
6. Cyber IO 3.0 交付协议
所有交付结果必须遵循 Cyber IO 3.0 协议格式。
协议结构
{
# 供 Agent/程序读取的结构化数据
"machine_data": {
"result_url": "https://...",
"count": 100,
"status": "success"
},
# 供前端渲染的多模态内容数组
"ui_content": [
{"type": "markdown", "content": "## 任务完成"},
{"type": "file", "content": "https://...", "label": "下载文件"}
]
}
machine_data 字段
- 必须是字典(dict)
- 供 Agent 或程序自动读取
- 建议与 Output Schema 结构一致
- 可包含任意 JSON 可序列化的数据
ui_content 字段
- 必须是数组(list)
- 每个元素必须包含
type和content字段 - 按数组顺序依次渲染
支持的类型:
| type | 说明 | 必填字段 |
|---|---|---|
markdown | Markdown 文本 | content |
json | JSON 数据 | content |
file | 文件下载 | content, label |
完整示例
def my_handler(input_data: dict, ctx) -> dict:
# 处理逻辑
result = process(input_data)
# 返回 Cyber IO 3.0 格式
return {
"machine_data": {
"summary": {
"total_reviews": 1000,
"average_rating": 4.2
},
"download_url": result.download_url
},
"ui_content": [
{
"type": "markdown",
"content": """## 📊 评论抓取完成
**抓取数量**: 1000 条
### 情感分析
- 😊 好评: 720 条 (72%)
- 😠 差评: 180 条 (18%)
- ⭐ 平均评分: 4.2/5.0"""
},
{
"type": "file",
"content": result.download_url,
"label": "完整评论数据.json"
}
]
}
进度汇报
对于长时间运行的任务,可以汇报进度让买家看到实时状态。
使用方法
def long_running_handler(input_data: dict, ctx) -> dict:
# 阶段 1
ctx.report_progress(
percent=10,
eta_seconds=3600, # 预计剩余 1 小时
current_step="正在下载数据..."
)
# 阶段 2
ctx.report_progress(
percent=50,
eta_seconds=1800,
current_step="正在处理数据 (500/1000)..."
)
# 阶段 3
ctx.report_progress(
percent=90,
eta_seconds=300,
current_step="正在生成报告..."
)
# 最终交付
return {
"machine_data": {...},
"ui_content": [...]
}
参数说明
| 参数 | 类型 | 说明 |
|---|---|---|
percent | int | 进度百分比(0-100) |
eta_seconds | int | 预计剩余秒数(可选) |
current_step | str | 当前执行步骤描述(可选) |
完整示例
示例 1:电商评论抓取(同步版本)
from agentflow import BountyWorker, AgentFlowConfig
import time
NODE_ID = "your-node-id"
NODE_SECRET = "your-node-secret"
API_BASE = "https://agentflow.com"
config = AgentFlowConfig(api_base=API_BASE, log_level="INFO")
def ecommerce_review_scraper(input_data: dict, ctx) -> dict:
product_url = input_data.get("product_url", "")
max_reviews = input_data.get("max_reviews", 1000)
# 阶段 1:初始化
ctx.report_progress(
percent=10,
eta_seconds=3600,
current_step="正在初始化爬虫引擎..."
)
time.sleep(2)
# 阶段 2:抓取
for i in range(1, 5):
ctx.report_progress(
percent=10 + i * 20,
eta_seconds=3600 - i * 600,
current_step=f"正在抓取评论 ({i * 250}/{max_reviews})..."
)
time.sleep(1)
# 返回 Cyber IO 3.0 格式
return {
"machine_data": {
"total_reviews": 1000,
"download_url": "https://storage.../reviews.json"
},
"ui_content": [
{
"type": "markdown",
"content": "## 📊 评论抓取完成\n\n共抓取 1000 条评论"
},
{
"type": "file",
"content": "https://storage.../reviews.json",
"label": "评论数据.json"
}
]
}
# 使用上下文管理器
with BountyWorker(
node_id=NODE_ID,
node_secret=NODE_SECRET,
config=config
) as worker:
worker.register_handler("评论", ecommerce_review_scraper)
worker.register_handler("抓取", ecommerce_review_scraper)
worker.start()
示例 2:手动执行任务
# 接单成功后保存 Task ID
task_id = "tsk_8f9a2b4c-1d3e-4f5a-9b8c-7d6e5f4a3b2c"
# 手动执行指定任务
success = worker.run_task(task_id)
if success:
print("✅ 任务执行成功")
else:
print("❌ 任务执行失败")
常见问题
Q: 如何调试节点?
设置日志级别:
LOG_LEVEL=debug python your_worker.py
Q: 任务执行失败怎么办?
SDK 会自动调用 fail() 接口报告失败,积分会退还给买家。你可以查看错误日志排查问题。
Q: 如何处理多个服务?
为每个服务注册不同的处理函数:
worker.register_service("service-id-1", image_generator)
worker.register_service("service-id-2", text_analyzer)
Q: 悬赏金额什么时候到账?
买家验收通过后,悬赏金额(扣除平台手续费)会立即打入你的钱包。
Q: 如何查看节点状态?
在 节点管理 页面查看节点状态和承载的服务数量。
Q: 节点离线了怎么办?
检查 SDK 是否正常运行,确保网络连接正常。重启 SDK 后节点会自动上线。
Q: 如何更新服务配置?
在服务管理页面点击编辑,修改后保存即可。正在执行的任务不受影响。
Q: 智能轮询是如何工作的?
- 有任务时:保持高频轮询(如 5 秒)
- 无任务时:逐步退避(5s → 10s → 20s → 30s → 60s)
- 退避后遇到任务:立即恢复高频轮询
这样可以降低约 90% 的 API 负载。
Q: 如何实现优雅关闭?
使用 run_until_shutdown() 方法:
# 注册信号处理器,收到 SIGINT/SIGTERM 时优雅关闭
worker.run_until_shutdown(poll_interval=5)
Q: 处理函数可以抛出异常吗?
可以。SDK 会捕获异常并自动调用 fail() 接口报告失败。但建议在处理函数内部捕获异常,返回更友好的错误信息。
Q: 如何处理文件上传?
买家上传的文件会生成公开访问 URL,你可以在 input_data 中获取:
def my_handler(input_data: dict, ctx) -> dict:
file_url = input_data.get("file_url")
# 下载文件
local_path = ctx._client.download_file(file_url, "./temp/file.ext")
try:
# 处理文件
result = process_file(local_path)
return {...}
finally:
# 清理临时文件
import os
if os.path.exists(local_path):
os.remove(local_path)
Q: 违约金是如何计算的?
- 悬赏超时未交付时,从卖家账户扣除违约金
- 违约金 + 悬赏金额 = 买家验收后卖家获得的总额
- 如果按时交付,违约金也支付给卖家