目录 / 文档-技术白皮书 / 45-EFT.WP.Data.Pipeline v1.0
I. 模板范围与口径
。冻结索引两套可直接落盘的 YAML/JSON 模板;键名统一 snake_case;跨卷引用采用“卷名 vX.Y:锚点”;单位体系遵循 SI 与 check_dim=true;评测/导出相关切分一律使用完整骨架模板与最小模板提供II. 最小模板(可直接复制)
pipeline:
id: "eift.ingest-validate-transform-export"
version: "v1.0"
layers:
- name: "ingest"
stages:
- name: "src.s3.pull"
type: "source.s3"
impl: "I16-1.s3_pull"
params:
endpoint: "https://s3.amazonaws.com"
bucket_or_db: "eift-data"
prefix_or_table: "raw/2025/09/"
query_or_pattern: "*.jsonl"
credentials_ref: "secrets://aws/ingest_ro"
format: "json"
outputs: ["raw_blob"]
idempotent: true
retries: {max: 3, backoff: "expo", jitter_ms: 200}
timeout_s: 1800
- name: "validate"
stages:
- name: "dq.scan"
type: "validate.dq"
impl: "I16-7.dq_scan"
inputs: ["raw_blob"]
outputs: ["dq_report"]
schema_ref: "contracts/raw_json@v1.0"
dq:
sample: {rows: 100000, strategy: "stratified"}
significance: {alpha: 0.05}
gates:
- {id:"DQ_001", kind:"not_null", cols:["id","ts"], level:"block"}
- name: "transform"
stages:
- name: "standardize"
type: "transform.normalize"
impl: "I16-3.standardize"
inputs: ["raw_blob"]
outputs: ["std_rows"]
params: {method:"zscore", stats_from:"train-only"}
idempotent: true
- name: "export"
stages:
- name: "split.package"
type: "export.splits"
impl: "I16-5.split_package"
inputs: ["std_rows"]
outputs: ["train_pkg","val_pkg","test_pkg"]
splits:
train: {ratio: 0.8}
validation: {ratio: 0.1}
test: {ratio: 0.1}
policy:
leakage_guard: ["per-object","per-timewindow"]
freeze_indices: true
edges:
- {from:"src.s3.pull:raw_blob", to:"dq.scan:raw_blob"}
- {from:"dq.scan:dq_report", to:"standardize:raw_blob"}
- {from:"standardize:std_rows", to:"split.package:std_rows"}
metrology: {units:"SI", check_dim:true}
export_manifest:
version: "v1.0"
artifacts:
- {path:"pipeline.yaml", sha256:"<hex>"}
references:
- "EFT.WP.Core.DataSpec v1.0:EXPORT"
- "EFT.WP.Core.Metrology v1.0:check_dim"
III. 完整骨架模板(发布级,含可选扩展)
pipeline:
id: "<org.project.pipeline>"
version: "v1.0.0"
orchestration:
orchestrator: "airflow|argo|ray|custom"
dag: {max_concurrency: 128, backfill:{enabled:true, window:"P7D"}}
dependencies: []
triggers:
cron: "5 * * * *"
# event: {source:"kafka", topic:"ds.ready", group:"pipeline-consumer"}
scheduling:
queue: "default"
priority: 5
preempt: true
retries: {max:3, backoff:"expo", jitter_ms:200}
timeout_s: 3600
sla: {latency_ms:{p50:5000,p95:15000,p99:30000}, availability:0.999, error_rate:0.01}
alert_rules:
- {name:"sla_breach_p99", rule:"latency_ms.p99>30000 for 10m", severity:"high"}
resources:
requests: {cpu:4, mem_gb:16, gpu:0}
limits: {cpu:8, mem_gb:32, gpu:0}
disk_gb: 200
net_mbps: 800
qos: "burstable"
layers:
- name: "ingest"
stages:
- name: "<src.kind.name>"
type: "source.<s3|gcs|fs|db|kafka|http|custom>"
impl: "I16-1.<impl_id>"
params:
endpoint: "<url-or-bootstrap>"
bucket_or_db: "<bucket|db>"
prefix_or_table: "<prefix|schema.table>"
query_or_pattern: "<sql|glob>"
credentials_ref: "secrets://path/to/credential"
format: "<json|parquet|csv|avro|binary>"
watermark: {field:"<updated_at|offset|lsn>", start:"<ISO8601|offset>", step:"<PT5M|1000>"}
checkpoint: {path:"s3://.../chk/<stage>", mode:"exactly-once|at-least-once"}
dedupe_key: ["<pk>","<ts>"]
outputs: ["raw_blob|raw_rows|events"]
idempotent: true
retries: {max:3, backoff:"expo", jitter_ms:200}
timeout_s: 1800
on_fail: "quarantine|skip|block"
- name: "validate"
stages:
- name: "schema.check"
type: "validate.schema"
impl: "I16-2.schema_check"
inputs: ["raw_blob"]
outputs: ["raw_rows"]
schema_ref: "contracts/raw_rows@vX.Y"
- name: "dq.scan"
type: "validate.dq"
impl: "I16-7.dq_scan"
inputs: ["raw_rows"]
outputs: ["dq_report"]
schema_ref: "contracts/raw_rows@vX.Y"
dq:
sample: {rows: 50000, strategy:"stratified"}
significance: {alpha: 0.05}
gates:
- {id:"DQ_001", kind:"not_null", cols:["id","ts"], level:"block"}
- {id:"DQ_002", kind:"unique", cols:[["id","ts"]], level:"block"}
- name: "transform"
stages:
- name: "normalize"
type: "transform.normalize"
impl: "I16-3.standardize"
inputs: ["raw_rows"]
outputs: ["std_rows"]
params: {method:"zscore", stats_from:"train-only"}
idempotent: true
schema_ref: "contracts/std_rows@vX.Y"
- name: "feature"
stages:
- name: "feat.map"
type: "feature.map"
impl: "I16-4.feature_map"
inputs: ["std_rows"]
outputs: ["feat_rows"]
params:
key: ["entity_id","ts"]
point_in_time: {enabled:true, lookback:"P30D", tolerance:"PT5M"}
aggregate: {window:"P1D", funcs:["mean","std","count"], fillna:{method:"pad"}}
idempotent: true
schema_ref: "contracts/feat_rows@vX.Y"
feature_space: {type:"tabular", shape:"(N,D)", dtype:"float32", normalization:"zscore"}
- name: "export"
stages:
- name: "split.package"
type: "export.splits"
impl: "I16-5.split_package"
inputs: ["feat_rows"]
outputs: ["train_pkg","val_pkg","test_pkg"]
splits:
train: {ratio: 0.8}
validation: {ratio: 0.1}
test: {ratio: 0.1}
policy:
sampling:
strategy: "random|stratified|time-based|spatial-tiles|systematic"
strata: [{by:"class|region|snr_bin", buckets: {"A":100,"B":200}}]
leakage_guard: ["per-object","per-timewindow","per-scene"]
freeze_indices: true
distribution:
packaging: {format:"tgz|parquet|zarr", shard_bytes:134217728, layout:["train","validation","test"]}
mirrors: ["https://mirror-a.example/ds/foo/","s3://bucket/foo/"]
rate_limit: {mbps: 50}
checksums:
package: {sha256: "<hex>"}
shards:
- {path:"train-000.tgz", sha256:"<hex>"}
edges: []
monitoring:
metrics:
perf:
- {name:"qps", unit:"1/s", agg:"sum", window:"1m"}
- {name:"latency_ms.p99", unit:"ms", agg:"quant", window:"1m"}
metrology: {units:"SI", check_dim:true}
export_manifest:
version: "v1.0"
artifacts:
- {path:"pipeline.yaml", sha256:"<hex>"}
- {path:"contracts/raw_rows.schema.json", sha256:"<hex>"}
references:
- "EFT.WP.Core.DataSpec v1.0:EXPORT"
- "EFT.WP.Core.Metrology v1.0:check_dim"
IV. 字段占位符与最小正则(速查)
- pipeline.id: ^[a-z0-9_\\-\\.]+$;pipeline.version: ^v\\d+\\.\\d+(\\.\\d+)?$;
- export_manifest.references[*]: ^[^:]+ v\\d+\\.\\d+:[A-Z].+$;
- splits 比例和:1±1e-6;policy.freeze_indices:true;leakage_guard 含 per-object|per-timewindow|per-scene 至少一项;
- 计量:metrology.units="SI" 与 check_dim=true。
V. 导出清单模板(规范性)
export_manifest:
version: "v1.0"
artifacts:
- {path:"pipeline.yaml", sha256:"<hex>"}
- {path:"splits/train.index", sha256:"<hex>"}
- {path:"splits/validation.index", sha256:"<hex>"}
- {path:"splits/test.index", sha256:"<hex>"}
- {path:"packages/train-000.tgz", sha256:"<hex>"}
- {path:"dq/report.jsonl", sha256:"<hex>"}
references:
- "EFT.WP.Core.DataSpec v1.0:EXPORT"
- "EFT.WP.Core.Metrology v1.0:check_dim"
- "EFT.WP.Data.DatasetCards v1.0:Ch.11"
- "EFT.WP.Data.ModelCards v1.0:Ch.11"
VI. 发布前阻断自检(清单)
- 结构/必填:pipeline.id/version/layers/edges 与 metrology/export_manifest 齐备,Schema 校验通过。
- 引用/版本:export_manifest.references[] 使用“卷名 vX.Y:锚点”,无短码/缺版本。
- 计量/单位:units="SI"、check_dim=true;性能/网络/存储等单位一致。
- 拓扑/切分/泄漏:Σ_out→Σ_in 兼容;切分比例和为 1、索引冻结、泄漏护栏到位。
- 安全/凭据:源阶段仅使用 credentials_ref;未出现明文密钥;访问与网络限制配置生效。
- 工件可验:export_manifest 中所有文件具 sha256 并可复现。
VII. 机器可读空白模板(无注释版,CI 友好)
pipeline:
id: ""
version: "v1.0"
orchestration: {orchestrator:"airflow", dag:{max_concurrency:64, backfill:{enabled:false}}}
scheduling: {queue:"default", priority:5, preempt:true, retries:{max:3, backoff:"expo", jitter_ms:200}, timeout_s:3600}
resources: {requests:{cpu:1, mem_gb:4, gpu:0}, limits:{cpu:2, mem_gb:8, gpu:0}, disk_gb:50, net_mbps:200, qos:"burstable"}
layers: []
edges: []
monitoring: {metrics:{}, logs:{format:"jsonl", retention:"P30D"}}
metrology: {units:"SI", check_dim:true}
export_manifest: {version:"v1.0", artifacts: [], references:["EFT.WP.Core.DataSpec v1.0:EXPORT","EFT.WP.Core.Metrology v1.0:check_dim"]}
版权与许可(CC BY 4.0)
版权声明:除另有说明外,《能量丝理论》(含文本、图表、插图、符号与公式)的著作权由作者(“屠广林”先生)享有。
许可方式:本作品采用 Creative Commons 署名 4.0 国际许可协议(CC BY 4.0)进行许可;在注明作者与来源的前提下,允许为商业或非商业目的进行复制、转载、节选、改编与再分发。
署名格式(建议):作者:“屠广林”;作品:《能量丝理论》;来源:energyfilament.org;许可证:CC BY 4.0。
首次发布: 2025-11-11|当前版本:v5.1
协议链接:https://creativecommons.org/licenses/by/4.0/