目录 / 文档-技术白皮书(V5.05) / 45-EFT.WP.Data.Pipeline v1.0
I. 章节目的与范围
:接口原型、请求/响应信封、错误码、鉴权与幂等、速率限制与版本协商;覆盖流水线校验、计量校核、引用锚点检查、拓扑规划与执行、运行度量与血缘、回放与发布/撤回;与数据契约、计量口径、引用锚点与导出清单对齐。执行 API与规范性实现绑定提供II. 服务面(规范性)
services:
pipelines.v1:
# 校验与检查
- POST /api/v1/pipelines/validate # 结构/依赖/拓扑与跨卷校验(阻断)
- POST /api/v1/pipelines/lint # Lint 规则执行
- POST /api/v1/pipelines/check_units # 计量/单位一致性
- POST /api/v1/pipelines/verify_references # 引用锚点格式与可达性
# 规划与执行
- POST /api/v1/pipelines/plan # DAG 规划与资源评估
- POST /api/v1/pipelines/run # 触发执行(同步/异步)
- POST /api/v1/pipelines/metrics # 运行指标查询(QPS/p99/ρ/成本)
- POST /api/v1/pipelines/lineage # 生成/查询血缘图
- POST /api/v1/pipelines/replay # 依据 inputs_lock 回放
# 工件与发布
- POST /api/v1/pipelines/hash_artifact # 工件哈希
- POST /api/v1/pipelines/sign_artifact # 工件签名/验签
- POST /api/v1/pipelines/publish_release # 发布
- POST /api/v1/pipelines/revoke_release # 撤回
III. 通用请求/响应与鉴权
request_envelope:
headers:
Authorization: "Bearer <oidc-token> | HMAC <key>:<sig>"
x-eift-idempotency: "<uuid>" # 幂等键(≥24h 有效)
content-type: "application/json"
body:
spec?: { ... } # pipeline 规范(YAML/JSON)
options?: {dry_run?: true, strict?: true}
filters?: {run_id?: "<id>", since?: "<ISO8601>", until?: "<ISO8601>"}
artifacts?: [{path, bytes_b64?, sha256?}]
response_envelope:
status: "ok" | "warn" | "error"
errors: [{code, message, path?, see?}]
warnings:[{code, message, path?, see?}]
metrics: { ... } # 规划/执行/计量/成本统计
data?: { ... } # 结构化结果(DAG/血缘/运行明细)
version: "pipelines.v1"
security:
auth: "OIDC bearer | HMAC"
tls: "TLS1.2+"
scope: ["validate","execute","metrics","lineage","publish","admin"]
rate_limits:
per_key_per_minute: 120
burst: 60
IV. 规范性 OpenAPI 摘录
openapi: 3.0.3
info: {title: "EFT Pipelines API", version: "v1"}
paths:
/api/v1/pipelines/validate:
post:
summary: Validate pipeline spec against schema, topology, and cross-volume constraints
requestBody: {required: true, content: {"application/json": {schema: {$ref: "#/components/schemas/SpecEnvelope"}}}}
responses:
"200": {description: "Result", content: {"application/json": {schema: {$ref: "#/components/schemas/Result"}}}}
/api/v1/pipelines/run:
post:
summary: Execute pipeline (sync/async)
requestBody: {required: true, content: {"application/json": {schema: {$ref: "#/components/schemas/RunRequest"}}}}
responses:
"200": {description: "Run accepted", content: {"application/json": {schema: {$ref: "#/components/schemas/RunResult"}}}}
components:
schemas:
SpecEnvelope: {type: object, properties: {spec: {}, options: {type: object}}}
Result:
type: object
properties:
status: {type: string, enum: [ok, warn, error]}
errors: {type: array, items: {$ref: "#/components/schemas/Issue"}}
warnings:{type: array, items: {$ref: "#/components/schemas/Issue"}}
metrics: {type: object}
data: {type: object}
RunRequest:
type: object
properties:
spec: {}
options: {type: object, properties:{mode:{type:string, enum:["sync","async"]}}}
RunResult:
type: object
properties:
run_id: {type: string}
state: {type: string, enum: ["queued","running","succeeded","failed"]}
dag: {type: object}
Issue:
type: object
properties:
code: {type: string}
message: {type: string}
path: {type: string}
see: {type: array, items: {type: string}}
V. 端点语义(要点)
- /pipelines/validate(阻断)
- Schema/正则、依赖锚点、metrology.units="SI"&check_dim=true、layers/edges 拓扑、Σ_out→Σ_in 兼容、冻结切分与泄漏护栏、安全最小项(credentials_ref)。
- /pipelines/lint
- 执行第16章 Lint:拓扑/切分/幂等/特征空间/安全与合规最小检查,返回规则级定位与修复建议。
- /pipelines/plan
- 生成可执行 DAG 与资源/成本估计:并发/队列/亲和与配额、预计 QPS/p99/ρ、usd_per_kqps。
- /pipelines/run
- options.mode:"sync|async";提供 run_id 与 DAG;异步模式可结合 /metrics 轮询。失败回执含补偿/回滚建议。
- /pipelines/metrics
- 查询运行期指标:qps/latency_ms.{p50,p95,p99}/ρ/net_mbps/size_bytes/cost_breakdown 与 SLO 合规状态。
- /pipelines/lineage
- 依据规范/运行历史生成 lineage.graph(节点携带版本/哈希);支持差异比对与“上/下游影响域”。
- /pipelines/replay
- 需 inputs_lock;policy:"strict|lenient";返回回放一致性(字节级/容忍域)与差异报告路径。
- /pipelines/hash_artifact | /sign_artifact
- 计算 sha256 与签名/验签;与 export_manifest.artifacts[] 对表。
- /pipelines/publish_release | /revoke_release
- 遵循版本化/公告策略:semver、stability_line、notice.type;撤回生成墓碑并同步镜像/索引。
VI. 错误码(规范性)
errors:
- {code:"ESCHEMA001", message:"missing required field", path:"$.pipeline.id", see:["EFT.WP.Core.DataSpec v1.0:EXPORT"]}
- {code:"ESEMVER001", message:"invalid semver", path:"$.pipeline.version"}
- {code:"EDIM001", message:"units must be SI and check_dim=true",path:"$.metrology", see:["EFT.WP.Core.Metrology v1.0:check_dim"]}
- {code:"ETOPO001", message:"Σ_out -> Σ_in schema incompatible", path:"$.pipeline.edges[*]"}
- {code:"ESPLIT001", message:"split ratios must sum to 1±1e-6", path:"$..stages[*].splits"}
- {code:"ELEAK000", message:"cross-split leakage detected", path:"$..policy.leakage_guard"}
- {code:"ESEC001", message:"credentials_ref required", path:"$..stages[?(@.type^='source.')].params"}
- {code:"EPUB001", message:"publish gate not met", path:"$.export_manifest"}
VII. 幂等性、版本协商与兼容性
idempotency:
header: "x-eift-idempotency"
window_hours: 24
versioning:
api: "pipelines.v1" # 破坏性变更 → 提升 MAJOR
minor: "向后兼容新增"
compatibility:
request_backward: "minor+patch"
response_fields: "新增仅追加,不移除"
VIII. 安全、审计与合规
- 鉴权:OIDC/HMAC;传输:TLS1.2+;最小权限:按 scope 授权。
- 审计:记录 request_id、idempotency_key、调用方、时间戳、摘要;日志纳入合规模块并在导出物中登记。
- 合规:区域限制与数据主体权利对接第14章;发布/撤回与版本化策略一致。
IX. 机器可读实现片段(Ixx-? 原型)
def validate_pipeline(spec: dict) -> dict: ...
def lint_pipeline(spec: dict, rules: dict) -> dict: ...
def check_units(spec: dict) -> dict: ... # uses Core.Metrology v1.0:check_dim
def verify_references(spec: dict) -> dict: ... # regex + anchor reachability
def plan(spec: dict, resources: dict | None = None) -> dict: ...
def run(spec: dict, mode: str = "async") -> dict: ...
def metrics(run_id: str, since: str | None = None, until: str | None = None) -> dict: ...
def lineage(spec: dict | None = None, run_id: str | None = None) -> dict: ...
def replay(run_id: str, policy: str = "strict") -> dict: ...
def hash_artifact(path: str | bytes) -> dict: ...
def sign_artifact(path: str | bytes, key_id: str) -> dict: ...
def publish_release(spec: dict) -> dict: ...
def revoke_release(tag: str, reason: str) -> dict: ...
X. 示例调用(可直接使用)
# 结构 + 跨卷校验
curl -s -X POST https://api.eift.org/api/v1/pipelines/validate \
-H "Authorization: Bearer <token>" \
-H "x-eift-idempotency: 7b7a0b1e-0a21-4f3f-9d0b-3b1e9b1f3c22" \
-H "Content-Type: application/json" \
-d @pipeline.json
# 规划与执行
curl -s -X POST https://api.eift.org/api/v1/pipelines/plan -d @pipeline.json
curl -s -X POST https://api.eift.org/api/v1/pipelines/run -d '{"spec": {...}, "options":{"mode":"async"}}'
# 指标与血缘
curl -s -X POST https://api.eift.org/api/v1/pipelines/metrics -d '{"filters":{"run_id":"RUN-123"}}'
curl -s -X POST https://api.eift.org/api/v1/pipelines/lineage -d '{"filters":{"run_id":"RUN-123"}}'
XI. 与导出清单的耦合(规范性)
export_manifest:
artifacts:
- {path:"api/openapi.yaml", sha256:"..."}
- {path:"api/clients/python.tar.gz", sha256:"..."}
- {path:"runs/RUN-123/metrics.json", sha256:"..."}
- {path:"runs/RUN-123/lineage.graph", sha256:"..."}
references:
- "EFT.WP.Core.DataSpec v1.0:EXPORT"
- "EFT.WP.Core.Metrology v1.0:check_dim"
- "EFT.WP.Data.Pipeline v1.0:Ch.11"
XII. 本章合规自检
- 阻断接口(validate|check_units|verify_references|plan|run)已实现并启用鉴权/幂等/速率限制。
- 引用锚点采用“卷名 vX.Y:锚点”,并在 export_manifest.references[] 中体现;无短码与无版本引用。
- 计量校核生效(units="SI", check_dim=true);拓扑/切分/泄漏护栏/安全最小项通过 Lint。
- 发布/撤回遵循版本化策略;OpenAPI/SDK 与运行期指标/血缘工件已列入导出清单并可校验。
版权与许可:除另有说明外,《能量丝理论》(含文本、图表、插图、符号与公式)的著作权由作者(屠广林)享有。
许可方式(CC BY 4.0):在注明作者与来源的前提下,允许复制、转载、节选、改编与再分发。
署名格式(建议):作者:屠广林|作品:《能量丝理论》|来源:energyfilament.org|许可证:CC BY 4.0
验证召集: 作者独立自费、无雇主无资助;下一阶段将优先在最愿意公开讨论、公开复现、公开挑错的环境中推进落地,不限国家。欢迎各国媒体与同行抓住窗口组织验证,并与我们联系。
版本信息: 首次发布:2025-11-11 | 当前版本:v6.0+5.05