DuanFlow 文档
端流把 GPU 函数、HTTP 服务、批处理任务、队列和实时价格调度收敛到一个 Python SDK。你写业务函数,平台负责构建镜像、调度 GPU、暴露 Endpoint 和回收空闲资源。
DuanFlow Documentation
DuanFlow turns Python functions into GPU-backed services, jobs, queues, and HTTP endpoints. You write application code while the platform builds images, schedules GPUs, exposes endpoints, and scales idle capacity down.
快速开始
最小的 DuanFlow 应用只需要三个动作:安装 SDK、登录 CLI、声明一个可远程调用的函数。开发时可以本地运行;部署后使用 .remote() 把调用发送到云端 GPU worker。
Quickstart
A minimal DuanFlow app has three steps: install the SDK, authenticate the CLI, and declare a remote function. During development the file is normal Python; after deployment, .remote() sends calls to a cloud GPU worker.
pip install duanflow
duanflow auth login
import duanflow as df
app = df.App("hello-gpu")
@app.function(gpu="L40S", memory="48Gi", timeout=300)
def classify_ticket(text):
label = model.predict(text)
return {"label": label, "input": text}
duanflow deploy app.py
duanflow run app.py::classify_ticket --text "我的模型接口超时了"
应用、镜像与 Secret
df.App 是部署单元。它描述应用名称、运行镜像、依赖、环境变量、Secret 和默认资源策略。一个 App 可以同时包含函数、Endpoint、队列 worker 和训练任务。
Apps, Images, and Secrets
df.App is the deployment unit. It describes the app name, runtime image, dependencies, environment variables, secrets, and default resource policy. One app can contain functions, endpoints, queue workers, and training jobs.
import duanflow as df
image = (
df.Image.cuda("12.4", python="3.11")
.pip_install("torch", "transformers", "accelerate")
.apt_install("ffmpeg")
)
app = df.App(
"customer-ai-api",
image=image,
secrets=[df.Secret.from_name("hf-prod")],
env={"MODEL_ID": "Qwen/Qwen2.5-32B-Instruct"},
)
GPU 函数
GPU 函数适合被 Python 代码、批处理、队列或另一个 Endpoint 调用。模型加载应该放在模块级或 class 初始化阶段,这样 warm worker 可以复用权重。
GPU Functions
GPU functions are best for compute called by Python code, batches, queues, or another endpoint. Put model loading at module scope or inside a lifecycle class so warm workers can reuse weights.
from transformers import AutoModelForCausalLM, AutoTokenizer
import duanflow as df
app = df.App("qwen-summarizer")
tokenizer = AutoTokenizer.from_pretrained(os.environ["MODEL_ID"])
model = AutoModelForCausalLM.from_pretrained(os.environ["MODEL_ID"]).cuda()
@app.function(gpu="H100", memory="80Gi", concurrency=12)
def summarize(document):
prompt = "Summarize this document in 5 bullets:\n" + document
tokens = tokenizer(prompt, return_tensors="pt").to("cuda")
output = model.generate(**tokens, max_new_tokens=512)
return tokenizer.decode(output[0], skip_special_tokens=True)
HTTP Endpoint
Endpoint 把函数暴露成 HTTPS API。你可以指定路径、方法、鉴权策略、warm 实例数和突发扩容上限。返回值会序列化为 JSON、文本或二进制响应。
HTTP Endpoints
Endpoints expose functions as HTTPS APIs. Configure path, method, auth policy, warm workers, and burst limits. Return values can be serialized as JSON, text, or binary responses.
@app.endpoint(
method="POST",
path="/v1/chat/completions",
gpu="H100",
keep_warm=1,
auth=df.Auth.bearer("DUANFLOW_API_KEY"),
)
def chat_completions(request):
body = request.json()
answer = chat_model.generate(body["messages"], temperature=body.get("temperature", 0.7))
return {"model": "duanflow-qwen", "choices": [{"message": answer}]}
任务、队列与批处理
长时间训练用 @app.job,事件驱动处理用 @app.worker,海量数据 fan-out 用 .map()。这些模式共用同一套日志、重试和资源策略。
Jobs, Queues, and Batches
Use @app.job for long training runs, @app.worker for event-driven processing, and .map() for large fan-out workloads. They share logs, retries, and resource policies.
audio_queue = df.Queue("audio-inbox", max_retries=3)
@app.worker(queue=audio_queue, gpu="A10G", scale_to=12)
def transcribe_audio(job):
transcript = whisper.transcribe(job["input_url"])
return df.storage.write_json(job["output_url"], transcript)
@app.job(gpu="A100-80G", gpu_count=4, timeout=10800)
def train_lora(dataset_uri):
return trainer.fit(dataset_uri, rank=16)
GPU 智能调度
调度器会基于 GPU 类型、显存、区域、价格上限和优化目标选择资源池。批处理通常选价格优先;线上 API 通常选延迟优先;合规场景可以锁定地域。
GPU Router
The router selects a resource pool using GPU type, memory, region, price cap, and optimization target. Batch workloads usually optimize for price; production APIs optimize for latency; compliance-sensitive apps pin regions.
@app.function(
gpu=["H100", "A100-80G", "L40S"],
memory="48Gi+",
optimize="price",
max_price=1.20,
region=["cn-shanghai", "ap-singapore"],
)
def embed_documents(documents):
return encoder.encode(documents)
部署、日志与版本
部署会生成不可变 release。你可以查看构建日志、运行日志、请求日志和调度事件;回滚时只需要指定历史 release。
Deploy, Logs, and Releases
Deployments create immutable releases. You can inspect build logs, runtime logs, request logs, and scheduler events; rollback points to a previous release.
duanflow deploy app.py --env prod --name customer-ai-api
duanflow logs customer-ai-api --follow
duanflow logs customer-ai-api --kind scheduler --since 30m
duanflow releases customer-ai-api
duanflow rollback customer-ai-api --to rel_20260505_1842
SDK 速查
这些是 mock 页面中使用的核心 SDK 概念,方便演示时快速解释能力边界。
SDK Reference Cheatsheet
These are the core SDK concepts used throughout the mock pages, useful when explaining the product surface during a demo.