目录文档-技术白皮书(V5.05)45-EFT.WP.Data.Pipeline v1.0

第17章 实现绑定与执行 API


I. 章节目的与范围

:接口原型、请求/响应信封、错误码、鉴权与幂等、速率限制与版本协商;覆盖流水线校验、计量校核、引用锚点检查、拓扑规划与执行、运行度量与血缘、回放与发布/撤回;与数据契约、计量口径、引用锚点与导出清单对齐。执行 API规范性实现绑定提供

II. 服务面(规范性)

services:

pipelines.v1:

# 校验与检查

- POST /api/v1/pipelines/validate # 结构/依赖/拓扑与跨卷校验(阻断)

- POST /api/v1/pipelines/lint # Lint 规则执行

- POST /api/v1/pipelines/check_units # 计量/单位一致性

- POST /api/v1/pipelines/verify_references # 引用锚点格式与可达性

# 规划与执行

- POST /api/v1/pipelines/plan # DAG 规划与资源评估

- POST /api/v1/pipelines/run # 触发执行(同步/异步)

- POST /api/v1/pipelines/metrics # 运行指标查询(QPS/p99/ρ/成本)

- POST /api/v1/pipelines/lineage # 生成/查询血缘图

- POST /api/v1/pipelines/replay # 依据 inputs_lock 回放

# 工件与发布

- POST /api/v1/pipelines/hash_artifact # 工件哈希

- POST /api/v1/pipelines/sign_artifact # 工件签名/验签

- POST /api/v1/pipelines/publish_release # 发布

- POST /api/v1/pipelines/revoke_release # 撤回


III. 通用请求/响应与鉴权

request_envelope:

headers:

Authorization: "Bearer <oidc-token> | HMAC <key>:<sig>"

x-eift-idempotency: "<uuid>" # 幂等键(≥24h 有效)

content-type: "application/json"

body:

spec?: { ... } # pipeline 规范(YAML/JSON)

options?: {dry_run?: true, strict?: true}

filters?: {run_id?: "<id>", since?: "<ISO8601>", until?: "<ISO8601>"}

artifacts?: [{path, bytes_b64?, sha256?}]

response_envelope:

status: "ok" | "warn" | "error"

errors: [{code, message, path?, see?}]

warnings:[{code, message, path?, see?}]

metrics: { ... } # 规划/执行/计量/成本统计

data?: { ... } # 结构化结果(DAG/血缘/运行明细)

version: "pipelines.v1"

security:

auth: "OIDC bearer | HMAC"

tls: "TLS1.2+"

scope: ["validate","execute","metrics","lineage","publish","admin"]

rate_limits:

per_key_per_minute: 120

burst: 60


IV. 规范性 OpenAPI 摘录

openapi: 3.0.3

info: {title: "EFT Pipelines API", version: "v1"}

paths:

/api/v1/pipelines/validate:

post:

summary: Validate pipeline spec against schema, topology, and cross-volume constraints

requestBody: {required: true, content: {"application/json": {schema: {$ref: "#/components/schemas/SpecEnvelope"}}}}

responses:

"200": {description: "Result", content: {"application/json": {schema: {$ref: "#/components/schemas/Result"}}}}

/api/v1/pipelines/run:

post:

summary: Execute pipeline (sync/async)

requestBody: {required: true, content: {"application/json": {schema: {$ref: "#/components/schemas/RunRequest"}}}}

responses:

"200": {description: "Run accepted", content: {"application/json": {schema: {$ref: "#/components/schemas/RunResult"}}}}

components:

schemas:

SpecEnvelope: {type: object, properties: {spec: {}, options: {type: object}}}

Result:

type: object

properties:

status: {type: string, enum: [ok, warn, error]}

errors: {type: array, items: {$ref: "#/components/schemas/Issue"}}

warnings:{type: array, items: {$ref: "#/components/schemas/Issue"}}

metrics: {type: object}

data: {type: object}

RunRequest:

type: object

properties:

spec: {}

options: {type: object, properties:{mode:{type:string, enum:["sync","async"]}}}

RunResult:

type: object

properties:

run_id: {type: string}

state: {type: string, enum: ["queued","running","succeeded","failed"]}

dag: {type: object}

Issue:

type: object

properties:

code: {type: string}

message: {type: string}

path: {type: string}

see: {type: array, items: {type: string}}


V. 端点语义(要点)

  1. /pipelines/validate(阻断)
  1. /pipelines/lint
  1. /pipelines/plan
  1. /pipelines/run
  1. /pipelines/metrics
  1. /pipelines/lineage
  1. /pipelines/replay
  1. /pipelines/hash_artifact | /sign_artifact
  1. /pipelines/publish_release | /revoke_release

VI. 错误码(规范性)

errors:

- {code:"ESCHEMA001", message:"missing required field", path:"$.pipeline.id", see:["EFT.WP.Core.DataSpec v1.0:EXPORT"]}

- {code:"ESEMVER001", message:"invalid semver", path:"$.pipeline.version"}

- {code:"EDIM001", message:"units must be SI and check_dim=true",path:"$.metrology", see:["EFT.WP.Core.Metrology v1.0:check_dim"]}

- {code:"ETOPO001", message:"Σ_out -> Σ_in schema incompatible", path:"$.pipeline.edges[*]"}

- {code:"ESPLIT001", message:"split ratios must sum to 1±1e-6", path:"$..stages[*].splits"}

- {code:"ELEAK000", message:"cross-split leakage detected", path:"$..policy.leakage_guard"}

- {code:"ESEC001", message:"credentials_ref required", path:"$..stages[?(@.type^='source.')].params"}

- {code:"EPUB001", message:"publish gate not met", path:"$.export_manifest"}


VII. 幂等性、版本协商与兼容性

idempotency:

header: "x-eift-idempotency"

window_hours: 24

versioning:

api: "pipelines.v1" # 破坏性变更 → 提升 MAJOR

minor: "向后兼容新增"

compatibility:

request_backward: "minor+patch"

response_fields: "新增仅追加,不移除"


VIII. 安全、审计与合规


IX. 机器可读实现片段(Ixx-? 原型)

def validate_pipeline(spec: dict) -> dict: ...

def lint_pipeline(spec: dict, rules: dict) -> dict: ...

def check_units(spec: dict) -> dict: ... # uses Core.Metrology v1.0:check_dim

def verify_references(spec: dict) -> dict: ... # regex + anchor reachability

def plan(spec: dict, resources: dict | None = None) -> dict: ...

def run(spec: dict, mode: str = "async") -> dict: ...

def metrics(run_id: str, since: str | None = None, until: str | None = None) -> dict: ...

def lineage(spec: dict | None = None, run_id: str | None = None) -> dict: ...

def replay(run_id: str, policy: str = "strict") -> dict: ...

def hash_artifact(path: str | bytes) -> dict: ...

def sign_artifact(path: str | bytes, key_id: str) -> dict: ...

def publish_release(spec: dict) -> dict: ...

def revoke_release(tag: str, reason: str) -> dict: ...


X. 示例调用(可直接使用)

# 结构 + 跨卷校验

curl -s -X POST https://api.eift.org/api/v1/pipelines/validate \

-H "Authorization: Bearer <token>" \

-H "x-eift-idempotency: 7b7a0b1e-0a21-4f3f-9d0b-3b1e9b1f3c22" \

-H "Content-Type: application/json" \

-d @pipeline.json

# 规划与执行

curl -s -X POST https://api.eift.org/api/v1/pipelines/plan -d @pipeline.json

curl -s -X POST https://api.eift.org/api/v1/pipelines/run -d '{"spec": {...}, "options":{"mode":"async"}}'

# 指标与血缘

curl -s -X POST https://api.eift.org/api/v1/pipelines/metrics -d '{"filters":{"run_id":"RUN-123"}}'

curl -s -X POST https://api.eift.org/api/v1/pipelines/lineage -d '{"filters":{"run_id":"RUN-123"}}'


XI. 与导出清单的耦合(规范性)

export_manifest:

artifacts:

- {path:"api/openapi.yaml", sha256:"..."}

- {path:"api/clients/python.tar.gz", sha256:"..."}

- {path:"runs/RUN-123/metrics.json", sha256:"..."}

- {path:"runs/RUN-123/lineage.graph", sha256:"..."}

references:

- "EFT.WP.Core.DataSpec v1.0:EXPORT"

- "EFT.WP.Core.Metrology v1.0:check_dim"

- "EFT.WP.Data.Pipeline v1.0:Ch.11"


XII. 本章合规自检


版权与许可:除另有说明外,《能量丝理论》(含文本、图表、插图、符号与公式)的著作权由作者(屠广林)享有。
许可方式(CC BY 4.0):在注明作者与来源的前提下,允许复制、转载、节选、改编与再分发。
署名格式(建议):作者:屠广林|作品:《能量丝理论》|来源:energyfilament.org|许可证:CC BY 4.0
验证召集: 作者独立自费、无雇主无资助;下一阶段将优先在最愿意公开讨论、公开复现、公开挑错的环境中推进落地,不限国家。欢迎各国媒体与同行抓住窗口组织验证,并与我们联系。
版本信息: 首次发布:2025-11-11 | 当前版本:v6.0+5.05