目录文档-技术白皮书45-EFT.WP.Data.Pipeline v1.0

第3章 管道分层与总览


I. 章节目的与范围

固定 EFT 数据流水线的分层架构、拓扑与契约:layers[]/edges[]、stage/operator、输入/输出 Σ_in/Σ_out、质量门与异常传播、资源与调度、审计与导出物;确保与数据卡/模型卡、计量章与引用锚点一致。

II. 分层架构(规范性)

  1. 六层分解
    • ingest(数据源与摄取)
    • validate(Schema/契约校验与 DQ 质量门)
    • transform(转换与预处理)
    • feature(特征流水线与重用)
    • export(切分/分发与发布清单)
    • monitor(监控、日志与可观测性)
  2. 层内元素:stages[](有序)+ operators(实现绑定);层间通过 edges[] 明确生产/消费关系;任一 stage 必须声明输入/输出 Σ_in/Σ_out 与 I/O 契约。
  3. 异常传播:每层定义失败语义与处置策略:{on_fail:"block|skip|quarantine", retries:{max, backoff}, idempotent:true};失败事件必须进入审计与告警。

III. 拓扑与连接(layers[] / edges[])

  1. 拓扑规则:DAG 优先;允许受控环(需在 lint_rules 中放行并给出收敛与截止条件)。
  2. 连接约束
    • 边 edge.from -> edge.to 的 Σ_out(from) 与 Σ_in(to) 必须兼容(模式/单位/量纲)。
    • 允许 broadcast/partition/merge 三类连接,需在 edge.kind 标注并给出键列或哈希策略。
  3. 契约演进:任何上游 schema_ref 版本变化,需在本层 compat_mode:"forward|backward|break" 明确处理口径。

IV. 质量门与异常处置(跨层一致)


V. 资源与调度(resources / scheduling)


VI. I/O 契约与 Schema(Σ_in/Σ_out)


VII. 机器可读(最小可用示例,规范性)

pipeline:

id: "eift.ingest-validate-transform-export"

version: "v1.0"

layers:

- name: "ingest"

stages:

- name: "src.s3.pull"

type: "source.s3"

impl: "I16-1.s3_pull"

params: {bucket:"s3://data", prefix:"/raw/2025/"}

outputs: ["raw_blob"]

idempotent: true

retries: {max: 3, backoff: "expo"}

timeout_s: 1800

- name: "validate"

stages:

- name: "schema.check"

type: "validate.schema"

impl: "I16-2.schema_check"

inputs: ["raw_blob"]

outputs: ["raw_rows"]

schema_ref: "contracts/raw_rows@v1.2"

dq_rules: ["DQ_001:not_null", "DQ_007:unique_key"]

on_fail: "quarantine"

- name: "transform"

stages:

- name: "normalize"

type: "transform.standardize"

impl: "I16-3.standardize"

inputs: ["raw_rows"]

outputs: ["std_rows"]

params: {method:"zscore", stats_from:"train-only"}

- name: "feature"

stages:

- name: "feat.map"

type: "feature.map"

impl: "I16-4.feature_map"

inputs: ["std_rows"]

outputs: ["feat_rows"]

feature_space: {type:"tabular", shape:"(N,D)", dtype:"float32"}

- name: "export"

stages:

- name: "split.package"

type: "export.splits"

impl: "I16-5.split_package"

inputs: ["feat_rows"]

outputs: ["train_pkg","val_pkg","test_pkg"]

splits: {train:0.8, validation:0.1, test:0.1}

policy: {leakage_guard:["per-object","per-timewindow"]}

- name: "monitor"

stages:

- name: "metrics.push"

type: "monitor.metrics"

impl: "I16-6.metrics_push"

inputs: ["feat_rows"]

params: {targets: ["qps","latency_ms.p99","error_rate"]}

edges:

- {from:"src.s3.pull:raw_blob", to:"schema.check:raw_blob"}

- {from:"schema.check:raw_rows", to:"normalize:raw_rows"}

- {from:"normalize:std_rows", to:"feat.map:std_rows"}

- {from:"feat.map:feat_rows", to:"split.package:feat_rows"}

resources: {cpu: 8, mem_gb: 32, gpu: 0, net_mbps: 800}

scheduling: {orchestrator:"airflow", cron:"5 * * * *"}

quality_gates: ["schema.ok","dq.thresholds","privacy.checks","splits.frozen"]

metrology: {units:"SI", check_dim:true}

export_manifest:

version: "v1.0"

references:

- "EFT.WP.Core.DataSpec v1.0:EXPORT"

- "EFT.WP.Core.Metrology v1.0:check_dim"

- "EFT.WP.Data.DatasetCards v1.0:Ch.11"

artifacts:

- {path:"pipeline.yaml", sha256:"..."}

- {path:"dq/report.jsonl", sha256:"..."}

- {path:"metrics/runtime.csv", sha256:"..."}


VIII. 约束与 Lint 规则(节选,规范性)

lint_rules:

- id: STRUCT.LAYERS_NOT_EMPTY

when: "$.pipeline.layers"

assert: "len(value) > 0"

level: error

- id: TOPOLOGY.EDGES_COMPAT

when: "$.pipeline.edges[*]"

assert: "schema_compat(edge.from.Σ_out, edge.to.Σ_in)"

level: error

- id: SPLITS.SUM_TO_ONE

when: "$.pipeline.layers[*].stages[*].splits"

assert: "abs(train+validation+test - 1) <= 1e-6"

level: error

- id: LEAKAGE.GUARDS_PRESENT

when: "$.pipeline.layers[*].stages[*].policy.leakage_guard"

assert: "contains_any(['per-object','per-timewindow','per-scene'])"

level: error

- id: METROLOGY.SI_AND_CHECKDIM

when: "$.pipeline.metrology"

assert: "units == 'SI' and check_dim == true"

level: error

- id: RETRIES.TIMEOUT_DEFINED

when: "$.pipeline.layers[*].stages[*]"

assert: "idempotent == true and retries.max >= 0 and timeout_s > 0"

level: warn


IX. 计量与路径量(如适用)

  1. 性能与资源:统一使用 SI;例如 QPS(1/s)、T_inf(ms)、C_cpu(core)、C_mem(GiB)、P(W)。
  2. 路径量 T_arr:若任何转换/校正涉及到达时,必须在对应 stage 中登记:
    • delta_form:"const-factor|general"、path:"gamma(ell)"、measure:"d ell";
    • 两种等价表达之一:
      1. T_arr = ( 1 / c_ref ) * ( ∫ n_eff d ell )
      2. T_arr = ( ∫ ( n_eff / c_ref ) d ell );
        并通过 check_dim 校核。

X. 导出与审计(export_manifest)


XI. 本章合规自检


版权与许可(CC BY 4.0)

版权声明:除另有说明外,《能量丝理论》(含文本、图表、插图、符号与公式)的著作权由作者(“屠广林”先生)享有。
许可方式:本作品采用 Creative Commons 署名 4.0 国际许可协议(CC BY 4.0)进行许可;在注明作者与来源的前提下,允许为商业或非商业目的进行复制、转载、节选、改编与再分发。
署名格式(建议):作者:“屠广林”;作品:《能量丝理论》;来源:energyfilament.org;许可证:CC BY 4.0。

首次发布: 2025-11-11|当前版本:v5.1
协议链接:https://creativecommons.org/licenses/by/4.0/