目录 / 文档-技术白皮书 / 45-EFT.WP.Data.Pipeline v1.0
I. 章节目的与范围
固定 EFT 数据流水线的分层架构、拓扑与契约:layers[]/edges[]、stage/operator、输入/输出 Σ_in/Σ_out、质量门与异常传播、资源与调度、审计与导出物;确保与数据卡/模型卡、计量章与引用锚点一致。II. 分层架构(规范性)
- 六层分解:
- ingest(数据源与摄取)
- validate(Schema/契约校验与 DQ 质量门)
- transform(转换与预处理)
- feature(特征流水线与重用)
- export(切分/分发与发布清单)
- monitor(监控、日志与可观测性)
- 层内元素:stages[](有序)+ operators(实现绑定);层间通过 edges[] 明确生产/消费关系;任一 stage 必须声明输入/输出 Σ_in/Σ_out 与 I/O 契约。
- 异常传播:每层定义失败语义与处置策略:{on_fail:"block|skip|quarantine", retries:{max, backoff}, idempotent:true};失败事件必须进入审计与告警。
III. 拓扑与连接(layers[] / edges[])
- 拓扑规则:DAG 优先;允许受控环(需在 lint_rules 中放行并给出收敛与截止条件)。
- 连接约束:
- 边 edge.from -> edge.to 的 Σ_out(from) 与 Σ_in(to) 必须兼容(模式/单位/量纲)。
- 允许 broadcast/partition/merge 三类连接,需在 edge.kind 标注并给出键列或哈希策略。
- 契约演进:任何上游 schema_ref 版本变化,需在本层 compat_mode:"forward|backward|break" 明确处理口径。
IV. 质量门与异常处置(跨层一致)
- 质量门:在 validate 层配置 dq_rules[](统计阈值、完整性、唯一性、范围/分布、时间一致性、泄漏检查)。
- 阻断与预警:每条规则需标注等级:block|warn;阻断即停止下游并产出隔离工件与报告。
- 审计轨:质量门结果写入 dq/report.jsonl,并在 export_manifest.artifacts[] 登记 sha256。
V. 资源与调度(resources / scheduling)
- 资源画像:{cpu, mem_gb, gpu, disk_gb, net_mbps};性能度量统一:QPS、T_inf(ms)、利用率 ρ。
- 调度:orchestrator:"airflow|argo|ray",cron:"<CRON>" 或基于事件的 trigger;支持优先级与抢占;超时与重试策略在 stage 级定义。
- SLA/SLO:以 {latency_ms:{p50,p95,p99}, availability, error_rate} 表达,并绑定 alert_rules[]。
VI. I/O 契约与 Schema(Σ_in/Σ_out)
- 契约字段:schema_ref(版本化引用)、compat_mode、evolution_policy;数据类型、主键/分区键与单位/量纲在 Schema 内定义。
- 量纲与单位:所有物理/时间/频率/性能量统一 SI,metrology:{units:"SI", check_dim:true};合成前先单位归一。
- 切分一致:涉及训练/评测数据的下游产物,splits 必须与数据卡冻结切分一致。
VII. 机器可读(最小可用示例,规范性)
pipeline:
id: "eift.ingest-validate-transform-export"
version: "v1.0"
layers:
- name: "ingest"
stages:
- name: "src.s3.pull"
type: "source.s3"
impl: "I16-1.s3_pull"
params: {bucket:"s3://data", prefix:"/raw/2025/"}
outputs: ["raw_blob"]
idempotent: true
retries: {max: 3, backoff: "expo"}
timeout_s: 1800
- name: "validate"
stages:
- name: "schema.check"
type: "validate.schema"
impl: "I16-2.schema_check"
inputs: ["raw_blob"]
outputs: ["raw_rows"]
schema_ref: "contracts/raw_rows@v1.2"
dq_rules: ["DQ_001:not_null", "DQ_007:unique_key"]
on_fail: "quarantine"
- name: "transform"
stages:
- name: "normalize"
type: "transform.standardize"
impl: "I16-3.standardize"
inputs: ["raw_rows"]
outputs: ["std_rows"]
params: {method:"zscore", stats_from:"train-only"}
- name: "feature"
stages:
- name: "feat.map"
type: "feature.map"
impl: "I16-4.feature_map"
inputs: ["std_rows"]
outputs: ["feat_rows"]
feature_space: {type:"tabular", shape:"(N,D)", dtype:"float32"}
- name: "export"
stages:
- name: "split.package"
type: "export.splits"
impl: "I16-5.split_package"
inputs: ["feat_rows"]
outputs: ["train_pkg","val_pkg","test_pkg"]
splits: {train:0.8, validation:0.1, test:0.1}
policy: {leakage_guard:["per-object","per-timewindow"]}
- name: "monitor"
stages:
- name: "metrics.push"
type: "monitor.metrics"
impl: "I16-6.metrics_push"
inputs: ["feat_rows"]
params: {targets: ["qps","latency_ms.p99","error_rate"]}
edges:
- {from:"src.s3.pull:raw_blob", to:"schema.check:raw_blob"}
- {from:"schema.check:raw_rows", to:"normalize:raw_rows"}
- {from:"normalize:std_rows", to:"feat.map:std_rows"}
- {from:"feat.map:feat_rows", to:"split.package:feat_rows"}
resources: {cpu: 8, mem_gb: 32, gpu: 0, net_mbps: 800}
scheduling: {orchestrator:"airflow", cron:"5 * * * *"}
quality_gates: ["schema.ok","dq.thresholds","privacy.checks","splits.frozen"]
metrology: {units:"SI", check_dim:true}
export_manifest:
version: "v1.0"
references:
- "EFT.WP.Core.DataSpec v1.0:EXPORT"
- "EFT.WP.Core.Metrology v1.0:check_dim"
- "EFT.WP.Data.DatasetCards v1.0:Ch.11"
artifacts:
- {path:"pipeline.yaml", sha256:"..."}
- {path:"dq/report.jsonl", sha256:"..."}
- {path:"metrics/runtime.csv", sha256:"..."}
VIII. 约束与 Lint 规则(节选,规范性)
lint_rules:
- id: STRUCT.LAYERS_NOT_EMPTY
when: "$.pipeline.layers"
assert: "len(value) > 0"
level: error
- id: TOPOLOGY.EDGES_COMPAT
when: "$.pipeline.edges[*]"
assert: "schema_compat(edge.from.Σ_out, edge.to.Σ_in)"
level: error
- id: SPLITS.SUM_TO_ONE
when: "$.pipeline.layers[*].stages[*].splits"
assert: "abs(train+validation+test - 1) <= 1e-6"
level: error
- id: LEAKAGE.GUARDS_PRESENT
when: "$.pipeline.layers[*].stages[*].policy.leakage_guard"
assert: "contains_any(['per-object','per-timewindow','per-scene'])"
level: error
- id: METROLOGY.SI_AND_CHECKDIM
when: "$.pipeline.metrology"
assert: "units == 'SI' and check_dim == true"
level: error
- id: RETRIES.TIMEOUT_DEFINED
when: "$.pipeline.layers[*].stages[*]"
assert: "idempotent == true and retries.max >= 0 and timeout_s > 0"
level: warn
IX. 计量与路径量(如适用)
- 性能与资源:统一使用 SI;例如 QPS(1/s)、T_inf(ms)、C_cpu(core)、C_mem(GiB)、P(W)。
- 路径量 T_arr:若任何转换/校正涉及到达时,必须在对应 stage 中登记:
- delta_form:"const-factor|general"、path:"gamma(ell)"、measure:"d ell";
- 两种等价表达之一:
- T_arr = ( 1 / c_ref ) * ( ∫ n_eff d ell )
- T_arr = ( ∫ ( n_eff / c_ref ) d ell );
并通过 check_dim 校核。
X. 导出与审计(export_manifest)
- 必须包含:pipeline.yaml、质量门报告、运行指标与(如有)回放日志;references[] 使用“卷名 vX.Y:锚点”;所有工件含 sha256。
- 发布条件:lint 零阻断、DQ 通过、隐私/区域合规通过、切分冻结与泄漏护栏生效、SLA 达标。
XI. 本章合规自检
- layers[]/edges[] 定义完整,拓扑可解析且无未连接节点;Σ_in/Σ_out 兼容。
- 质量门、幂等/重试/超时策略明确;阻断与预警分级清晰。
- 资源与调度配置齐备,SLA/SLO 与告警规则生效。
- metrology.units="SI" 且 check_dim=true;性能/资源单位一致;如涉 T_arr 已登记 delta_form/path/measure。
- export_manifest 列出引用锚点与全部工件 sha256,splits 冻结且泄漏护栏到位。
版权与许可(CC BY 4.0)
版权声明:除另有说明外,《能量丝理论》(含文本、图表、插图、符号与公式)的著作权由作者(“屠广林”先生)享有。
许可方式:本作品采用 Creative Commons 署名 4.0 国际许可协议(CC BY 4.0)进行许可;在注明作者与来源的前提下,允许为商业或非商业目的进行复制、转载、节选、改编与再分发。
署名格式(建议):作者:“屠广林”;作品:《能量丝理论》;来源:energyfilament.org;许可证:CC BY 4.0。
首次发布: 2025-11-11|当前版本:v5.1
协议链接:https://creativecommons.org/licenses/by/4.0/