目录 / 文档-技术白皮书 / 45-EFT.WP.Data.Pipeline v1.0
I. 章节目的与范围
。禁用中文,覆盖结构/类型/正则/依赖/引用锚点/计量校核/幂等与重试/冻结切分与泄漏护栏/安全与合规最小检查;产物用于发布前阻断检查与门户自动校验。键名统一 snake_case;跨卷引用采用“卷名 vX.Y:锚点”;数学表达用反引号并加括号,Lint 规则集与规范性 JSON Schema提供流水线的II. 规范性工件(发布必备)
artifacts:
- path: "schema/pipeline.schema.json"
- path: "schema/lint_rules.yaml"
- path: "schema/examples/minimal.yaml"
- path: "schema/examples/full.yaml"
上述工件须在 export_manifest.artifacts[] 登记并附 sha256,引用锚点口径与本卷一致。III. 规范性 JSON Schema(核心摘录)
references[] 正则强制“卷名 vX.Y:锚点”;metrology.units="SI" 与 check_dim=true 为强制项。IV. Lint 规则(规范性)
version: "v1.0"
rules:
# 结构与版本
- id: STRUCT.REQUIRED
when: "$"
assert: "has_keys(pipeline, metrology, export_manifest)"
level: error
- id: VERSION.SEMVER
when: "$.pipeline.version"
assert: "matches('^v\\d+\\.\\d+(\\.\\d+)?$')"
level: error
# 拓扑与契约
- id: LAYERS.NOT_EMPTY
when: "$.pipeline.layers"
assert: "len(value) > 0"
level: error
- id: EDGES.COMPAT_SCHEMA
when: "$.pipeline.edges[*]"
assert: "schema_compat(edge.from.Σ_out, edge.to.Σ_in)"
level: error
# 采样与切分
- id: SPLIT.RATIO_SUM
when: "$..stages[?(@.type=='export.splits')].splits"
assert: "abs(train.ratio + validation.ratio + test.ratio - 1) <= 1e-6"
level: error
- id: SPLIT.FREEZE_REQUIRED
when: "$..stages[?(@.type=='export.splits')].policy.freeze_indices"
assert: "value == true"
level: error
- id: LEAKAGE.GUARDS_PRESENT
when: "$..stages[?(@.type=='export.splits')].policy.leakage_guard"
assert: "contains_any(['per-object','per-timewindow','per-scene'])"
level: error
# 校验与质量门
- id: DQ.SCHEMA_REF_REQUIRED
when: "$..stages[?(@.type=='validate.dq')]"
assert: "has_key('schema_ref')"
level: error
- id: DQ.SAMPLE_DEFINED
when: "$..stages[?(@.type=='validate.dq')].dq.sample"
assert: "value.rows > 0 and value.strategy in ['head','random','stratified']"
level: error
# 变换与特征
- id: TF.IDEMPOTENT_REQUIRED
when: "$..stages[?(@.type^='transform.')]"
assert: "idempotent == true"
level: error
- id: FEAT.FS_REQUIRED
when: "$..stages[?(@.type^='feature.')]"
assert: "has_key('feature_space')"
level: error
# 安全与合规最小检查
- id: SEC.CREDENTIALS_REF
when: "$..stages[?(@.type^='source.')].params"
assert: "has_key('credentials_ref') and not has_key('plain_secret')"
level: error
- id: PRIV.MINIMIZATION_ON
when: "$.privacy.data_minimization"
assert: "value == true"
level: error
# 计量
- id: METROLOGY.SI_AND_CHECKDIM
when: "$.metrology"
assert: "units == 'SI' and check_dim == true"
level: error
# 引用锚点
- id: REFERENCES.FORMAT
when: "$.export_manifest.references[*]"
assert: "matches('^[^:]+ v\\d+\\.\\d+:[A-Z].+$')"
level: error
。阻断项STRUCT.REQUIRED、VERSION.SEMVER、EDGES.COMPAT_SCHEMA、SPLIT.*、TF.IDEMPOTENT_REQUIRED、FEAT.FS_REQUIRED、SEC.CREDENTIALS_REF、METROLOGY.SI_AND_CHECKDIM、REFERENCES.FORMAT 为V. 失败样例与诊断(节选)
fail_examples:
- case: "bad reference format"
input: {export_manifest:{references:["Core.DataSpec:EXPORT"]}}
expect: {rule:"REFERENCES.FORMAT", level:"error",
fix:"Use 'EFT.WP.Core.DataSpec v1.0:EXPORT'"}
- case: "split ratios sum != 1"
input: {stages:[{type:"export.splits", splits:{train:{ratio:0.7}, validation:{ratio:0.2}, test:{ratio:0.2}}}]}
expect: {rule:"SPLIT.RATIO_SUM", level:"error",
fix:"Normalize ratios so they sum to 1±1e-6"}
- case: "no credentials_ref"
input: {stages:[{type:"source.s3", params:{endpoint:"...", plain_secret:"abc"}}]}
expect: {rule:"SEC.CREDENTIALS_REF", level:"error",
fix:"Remove plaintext secret; reference a secrets manager via credentials_ref"}
Lint 输出需包含 rule/path/message/fix 四要素。VI. 最小可用示例(通过 Schema 与 Lint)
pipeline:
id: "eift.ingest-validate-transform-export"
version: "v1.0"
layers:
- name: "ingest"
stages:
- name: "src.s3.pull"
type: "source.s3"
impl: "I16-1.s3_pull"
params: {endpoint:"https://s3.amazonaws.com", bucket_or_db:"eift-data",
prefix_or_table:"raw/2025/09/", query_or_pattern:"*.jsonl",
credentials_ref:"secrets://aws/ingest_ro", format:"json"}
outputs: ["raw_blob"]
idempotent: true
retries: {max:3, backoff:"expo", jitter_ms:200}
timeout_s: 1800
- name: "validate"
stages:
- name: "dq.scan"
type: "validate.dq"
impl: "I16-7.dq_scan"
inputs: ["raw_blob"]
outputs: ["dq_report"]
schema_ref: "contracts/raw_json@v1.2"
dq: {sample:{rows:100000, strategy:"stratified"}, significance:{alpha:0.05},
gates:[{id:"DQ_001", kind:"not_null", cols:["id","ts"], level:"block"}]}
edges:
- {from:"src.s3.pull:raw_blob", to:"dq.scan:raw_blob"}
metrology: {units:"SI", check_dim:true}
export_manifest:
version: "v1.0"
artifacts: [{path:"pipeline.yaml", sha256:"..."}]
references: ["EFT.WP.Core.DataSpec v1.0:EXPORT","EFT.WP.Core.Metrology v1.0:check_dim"]
VII. 与导出清单的耦合(规范性)
export_manifest:
artifacts:
- {path:"schema/pipeline.schema.json", sha256:"..."}
- {path:"schema/lint_rules.yaml", sha256:"..."}
- {path:"schema/examples/minimal.yaml", sha256:"..."}
references:
- "EFT.WP.Core.DataSpec v1.0:EXPORT"
- "EFT.WP.Core.Metrology v1.0:check_dim"
必须列出并可校验;引用携带“卷名 vX.Y:锚点”。阻断件Schema 与 Lint 为VIII. 验证接口(实现绑定 Ixx-?,统一返回)
def validate_pipeline(spec: dict) -> dict: ...
def lint_pipeline(spec: dict, rules: dict) -> dict: ...
def check_units(spec: dict) -> dict: ... # uses Core.Metrology v1.0:check_dim
def verify_references(spec: dict) -> dict: ...# regex + anchor reachability
返回 {"ok": bool, "errors":[...], "warnings":[...], "metrics":{...}},用于门户/CI。IX. 本章合规自检
- pipeline.schema.json 与 lint_rules.yaml 已生成并在 export_manifest 登记 sha256。
- Schema 强制 metrology.units="SI" & check_dim=true 与 references[] 锚点正则;Lint 阻断拓扑不兼容、切分未冻结、泄漏护栏缺失、幂等未声明与明文密钥。
- 采样/切分/分发配置与数据卡一致;特征与 I/O 契约/计量单位一致。
- 最小示例能一次通过 Schema 与 Lint;验证接口已集成并返回统一结构。
- 所有引用采用“卷名 vX.Y:锚点”,无短码/别名/缺版本引用。
版权与许可(CC BY 4.0)
版权声明:除另有说明外,《能量丝理论》(含文本、图表、插图、符号与公式)的著作权由作者(“屠广林”先生)享有。
许可方式:本作品采用 Creative Commons 署名 4.0 国际许可协议(CC BY 4.0)进行许可;在注明作者与来源的前提下,允许为商业或非商业目的进行复制、转载、节选、改编与再分发。
署名格式(建议):作者:“屠广林”;作品:《能量丝理论》;来源:energyfilament.org;许可证:CC BY 4.0。
首次发布: 2025-11-11|当前版本:v5.1
协议链接:https://creativecommons.org/licenses/by/4.0/