目录文档-技术白皮书45-EFT.WP.Data.Pipeline v1.0

第18章 附录:Pipeline 模板


I. 模板范围与口径

冻结索引两套可直接落盘的 YAML/JSON 模板;键名统一 snake_case;跨卷引用采用“卷名 vX.Y:锚点”;单位体系遵循 SI 与 check_dim=true;评测/导出相关切分一律使用完整骨架模板最小模板提供

II. 最小模板(可直接复制)

pipeline:

id: "eift.ingest-validate-transform-export"

version: "v1.0"

layers:

- name: "ingest"

stages:

- name: "src.s3.pull"

type: "source.s3"

impl: "I16-1.s3_pull"

params:

endpoint: "https://s3.amazonaws.com"

bucket_or_db: "eift-data"

prefix_or_table: "raw/2025/09/"

query_or_pattern: "*.jsonl"

credentials_ref: "secrets://aws/ingest_ro"

format: "json"

outputs: ["raw_blob"]

idempotent: true

retries: {max: 3, backoff: "expo", jitter_ms: 200}

timeout_s: 1800

- name: "validate"

stages:

- name: "dq.scan"

type: "validate.dq"

impl: "I16-7.dq_scan"

inputs: ["raw_blob"]

outputs: ["dq_report"]

schema_ref: "contracts/raw_json@v1.0"

dq:

sample: {rows: 100000, strategy: "stratified"}

significance: {alpha: 0.05}

gates:

- {id:"DQ_001", kind:"not_null", cols:["id","ts"], level:"block"}

- name: "transform"

stages:

- name: "standardize"

type: "transform.normalize"

impl: "I16-3.standardize"

inputs: ["raw_blob"]

outputs: ["std_rows"]

params: {method:"zscore", stats_from:"train-only"}

idempotent: true

- name: "export"

stages:

- name: "split.package"

type: "export.splits"

impl: "I16-5.split_package"

inputs: ["std_rows"]

outputs: ["train_pkg","val_pkg","test_pkg"]

splits:

train: {ratio: 0.8}

validation: {ratio: 0.1}

test: {ratio: 0.1}

policy:

leakage_guard: ["per-object","per-timewindow"]

freeze_indices: true

edges:

- {from:"src.s3.pull:raw_blob", to:"dq.scan:raw_blob"}

- {from:"dq.scan:dq_report", to:"standardize:raw_blob"}

- {from:"standardize:std_rows", to:"split.package:std_rows"}

metrology: {units:"SI", check_dim:true}

export_manifest:

version: "v1.0"

artifacts:

- {path:"pipeline.yaml", sha256:"<hex>"}

references:

- "EFT.WP.Core.DataSpec v1.0:EXPORT"

- "EFT.WP.Core.Metrology v1.0:check_dim"


III. 完整骨架模板(发布级,含可选扩展)

pipeline:

id: "<org.project.pipeline>"

version: "v1.0.0"

orchestration:

orchestrator: "airflow|argo|ray|custom"

dag: {max_concurrency: 128, backfill:{enabled:true, window:"P7D"}}

dependencies: []

triggers:

cron: "5 * * * *"

# event: {source:"kafka", topic:"ds.ready", group:"pipeline-consumer"}

scheduling:

queue: "default"

priority: 5

preempt: true

retries: {max:3, backoff:"expo", jitter_ms:200}

timeout_s: 3600

sla: {latency_ms:{p50:5000,p95:15000,p99:30000}, availability:0.999, error_rate:0.01}

alert_rules:

- {name:"sla_breach_p99", rule:"latency_ms.p99>30000 for 10m", severity:"high"}

resources:

requests: {cpu:4, mem_gb:16, gpu:0}

limits: {cpu:8, mem_gb:32, gpu:0}

disk_gb: 200

net_mbps: 800

qos: "burstable"

layers:

- name: "ingest"

stages:

- name: "<src.kind.name>"

type: "source.<s3|gcs|fs|db|kafka|http|custom>"

impl: "I16-1.<impl_id>"

params:

endpoint: "<url-or-bootstrap>"

bucket_or_db: "<bucket|db>"

prefix_or_table: "<prefix|schema.table>"

query_or_pattern: "<sql|glob>"

credentials_ref: "secrets://path/to/credential"

format: "<json|parquet|csv|avro|binary>"

watermark: {field:"<updated_at|offset|lsn>", start:"<ISO8601|offset>", step:"<PT5M|1000>"}

checkpoint: {path:"s3://.../chk/<stage>", mode:"exactly-once|at-least-once"}

dedupe_key: ["<pk>","<ts>"]

outputs: ["raw_blob|raw_rows|events"]

idempotent: true

retries: {max:3, backoff:"expo", jitter_ms:200}

timeout_s: 1800

on_fail: "quarantine|skip|block"

- name: "validate"

stages:

- name: "schema.check"

type: "validate.schema"

impl: "I16-2.schema_check"

inputs: ["raw_blob"]

outputs: ["raw_rows"]

schema_ref: "contracts/raw_rows@vX.Y"

- name: "dq.scan"

type: "validate.dq"

impl: "I16-7.dq_scan"

inputs: ["raw_rows"]

outputs: ["dq_report"]

schema_ref: "contracts/raw_rows@vX.Y"

dq:

sample: {rows: 50000, strategy:"stratified"}

significance: {alpha: 0.05}

gates:

- {id:"DQ_001", kind:"not_null", cols:["id","ts"], level:"block"}

- {id:"DQ_002", kind:"unique", cols:[["id","ts"]], level:"block"}

- name: "transform"

stages:

- name: "normalize"

type: "transform.normalize"

impl: "I16-3.standardize"

inputs: ["raw_rows"]

outputs: ["std_rows"]

params: {method:"zscore", stats_from:"train-only"}

idempotent: true

schema_ref: "contracts/std_rows@vX.Y"

- name: "feature"

stages:

- name: "feat.map"

type: "feature.map"

impl: "I16-4.feature_map"

inputs: ["std_rows"]

outputs: ["feat_rows"]

params:

key: ["entity_id","ts"]

point_in_time: {enabled:true, lookback:"P30D", tolerance:"PT5M"}

aggregate: {window:"P1D", funcs:["mean","std","count"], fillna:{method:"pad"}}

idempotent: true

schema_ref: "contracts/feat_rows@vX.Y"

feature_space: {type:"tabular", shape:"(N,D)", dtype:"float32", normalization:"zscore"}

- name: "export"

stages:

- name: "split.package"

type: "export.splits"

impl: "I16-5.split_package"

inputs: ["feat_rows"]

outputs: ["train_pkg","val_pkg","test_pkg"]

splits:

train: {ratio: 0.8}

validation: {ratio: 0.1}

test: {ratio: 0.1}

policy:

sampling:

strategy: "random|stratified|time-based|spatial-tiles|systematic"

strata: [{by:"class|region|snr_bin", buckets: {"A":100,"B":200}}]

leakage_guard: ["per-object","per-timewindow","per-scene"]

freeze_indices: true

distribution:

packaging: {format:"tgz|parquet|zarr", shard_bytes:134217728, layout:["train","validation","test"]}

mirrors: ["https://mirror-a.example/ds/foo/","s3://bucket/foo/"]

rate_limit: {mbps: 50}

checksums:

package: {sha256: "<hex>"}

shards:

- {path:"train-000.tgz", sha256:"<hex>"}

edges: []

monitoring:

metrics:

perf:

- {name:"qps", unit:"1/s", agg:"sum", window:"1m"}

- {name:"latency_ms.p99", unit:"ms", agg:"quant", window:"1m"}

metrology: {units:"SI", check_dim:true}

export_manifest:

version: "v1.0"

artifacts:

- {path:"pipeline.yaml", sha256:"<hex>"}

- {path:"contracts/raw_rows.schema.json", sha256:"<hex>"}

references:

- "EFT.WP.Core.DataSpec v1.0:EXPORT"

- "EFT.WP.Core.Metrology v1.0:check_dim"


IV. 字段占位符与最小正则(速查)


V. 导出清单模板(规范性)

export_manifest:

version: "v1.0"

artifacts:

- {path:"pipeline.yaml", sha256:"<hex>"}

- {path:"splits/train.index", sha256:"<hex>"}

- {path:"splits/validation.index", sha256:"<hex>"}

- {path:"splits/test.index", sha256:"<hex>"}

- {path:"packages/train-000.tgz", sha256:"<hex>"}

- {path:"dq/report.jsonl", sha256:"<hex>"}

references:

- "EFT.WP.Core.DataSpec v1.0:EXPORT"

- "EFT.WP.Core.Metrology v1.0:check_dim"

- "EFT.WP.Data.DatasetCards v1.0:Ch.11"

- "EFT.WP.Data.ModelCards v1.0:Ch.11"


VI. 发布前阻断自检(清单)


VII. 机器可读空白模板(无注释版,CI 友好)

pipeline:

id: ""

version: "v1.0"

orchestration: {orchestrator:"airflow", dag:{max_concurrency:64, backfill:{enabled:false}}}

scheduling: {queue:"default", priority:5, preempt:true, retries:{max:3, backoff:"expo", jitter_ms:200}, timeout_s:3600}

resources: {requests:{cpu:1, mem_gb:4, gpu:0}, limits:{cpu:2, mem_gb:8, gpu:0}, disk_gb:50, net_mbps:200, qos:"burstable"}

layers: []

edges: []

monitoring: {metrics:{}, logs:{format:"jsonl", retention:"P30D"}}

metrology: {units:"SI", check_dim:true}

export_manifest: {version:"v1.0", artifacts: [], references:["EFT.WP.Core.DataSpec v1.0:EXPORT","EFT.WP.Core.Metrology v1.0:check_dim"]}


版权与许可(CC BY 4.0)

版权声明:除另有说明外,《能量丝理论》(含文本、图表、插图、符号与公式)的著作权由作者(“屠广林”先生)享有。
许可方式:本作品采用 Creative Commons 署名 4.0 国际许可协议(CC BY 4.0)进行许可;在注明作者与来源的前提下,允许为商业或非商业目的进行复制、转载、节选、改编与再分发。
署名格式(建议):作者:“屠广林”;作品:《能量丝理论》;来源:energyfilament.org;许可证:CC BY 4.0。

首次发布: 2025-11-11|当前版本:v5.1
协议链接:https://creativecommons.org/licenses/by/4.0/