目录 / 文档-技术白皮书 / 45-EFT.WP.Data.Pipeline v1.0
I. 章节目的与范围
的规范:编排后端与拓扑提交、优先级与抢占、触发与依赖、重试与超时、SLA/SLO 与告警、资源画像与配额、自动扩缩与成本计量;确保与数据契约、质量门、监控与计量章一致。资源(resources) 与 调度(scheduling)、编排(orchestration)固化流水线II. 术语与依赖
- 术语:orchestrator、dag、queue、priority、preempt、retry、timeout_s、cron、event trigger、sla/slo、qos、requests/limits、autoscale、budget/cost。
- 依赖:契约与导出(《Core.DataSpec v1.0》);单位/量纲与性能计量(《Core.Metrology v1.0》);质量门(《DatasetCards v1.0》)、评测协议(《ModelCards v1.0》)。
- 数学与符号:内联符号用反引号(如 QPS、T_inf、ρ、p99);含除号/积分/复合算符必须加括号;公式/符号/定义禁用中文。
III. 字段与结构(规范性)
orchestration:
orchestrator: "airflow|argo|ray|custom"
dag:
max_concurrency: 128
backfill: {enabled: true, window: "P7D"}
dependencies:
- {from:"validate.schema", to:"transform.normalize"}
- {from:"transform.normalize", to:"feature.map"}
triggers:
cron: "5 * * * *" # 或 event: {source:"kafka", topic:"topic-x"}
event: {source:"kafka", topic:"ds.ready", group:"pipeline-consumer"} # 可选
scheduling:
queue: "high|default|low"
priority: 5 # 1~10,高优先级优先
preempt: true
retries: {max: 3, backoff: "expo", jitter_ms: 200}
timeout_s: 3600
sla:
latency_ms: {p50: 5000, p95: 15000, p99: 30000}
availability: 0.999
error_rate: 0.01
alert_rules:
- {name:"sla_breach_p99", rule:"latency_ms.p99>30000 for 10m", severity:"high"}
resources:
requests: {cpu: 4, mem_gb: 16, gpu: 0}
limits: {cpu: 8, mem_gb: 32, gpu: 0}
disk_gb: 200
net_mbps: 800
qos: "burstable|guaranteed|best-effort"
autoscale:
enabled: true
policy:
metric: "qps|latency_ms.p95|cpu|custom"
target: 0.7 # 目标利用率或阈值
min_replicas: 2
max_replicas: 64
cooldown_s: 120
cost:
budget:
currency: "USD"
monthly_cap: 2000
pricing_refs:
compute: "pricing/compute@v1.0"
storage: "pricing/storage@v1.0"
egress: "pricing/egress@v1.0"
metrology:
units: "SI"
check_dim: true
IV. 编排后端与提交
- 后端选择:airflow|argo|ray|custom;需声明并发上限、队列与工作节点拓扑;支持 backfill 的历史重跑窗口与幂等保障。
- DAG 依赖:在 dependencies[] 明确阶段顺序与扇出/扇入;循环需在 Lint 放行并给出收敛条件。
- 触发:支持定时 cron 与事件 event;事件触发需声明来源、主题与消费组。
V. 调度策略与失败语义
- 队列与优先级:queue 与 priority 决定资源分配;启用 preempt 可抢占低优先级任务。
- 重试与超时:指数退避 + 抖动;显式 timeout_s;错误分类(可重试/不可重试/升级告警)。
- SLA/SLO:以 {latency_ms:{p50,p95,p99}, availability, error_rate} 固化;违约触发 alert_rules 与降级策略。
VI. 资源画像与配额
- 请求/上限:requests/limits 明确 CPU/内存/GPU;磁盘与网络带宽单独声明;qos 指示调度器分配策略。
- 隔离与亲和:可在实现层(Ixx-?)约定节点选择器/亲和/污点容忍等;跨可用区部署需给出带宽与故障域策略。
- 指标:统一以 SI 计量:QPS(1/s)、T_inf(ms)、ρ(—)、net_mbps、size_bytes。
VII. 自动扩缩与弹性
- 策略:HPA/自定义扩缩,根据 metric(如 qps、latency_ms.p95、cpu)对目标值 target 调节副本;冷却时间 cooldown_s 防抖。
- 边界:min_replicas/max_replicas 明确扩缩边界;与 sla 持续违约时触发预警或限流。
- 成本联动:扩缩策略需与 budget.monthly_cap 协同,避免超支。
VIII. 成本度量与预算
- 预算:货币单位与上限 monthly_cap;
- 价格引用:pricing_refs.* 指向版本化价格表;
- 报告:对计算/存储/外网出流量形成分项成本报告,纳入导出物。
IX. 计量与单位(SI)
- 强制:metrology:{units:"SI", check_dim:true};合成或换算前先做单位归一;
- 性能/资源:QPS(1/s)、T_inf(ms)、ρ(—),net_mbps、size_bytes、power_w(如适用)。
X. 机器可读片段(可直接嵌入)
orchestration:
orchestrator: "argo"
dag: {max_concurrency: 256, backfill:{enabled:true, window:"P3D"}}
dependencies:
- {from:"validate.schema", to:"transform.normalize"}
- {from:"transform.normalize", to:"feature.map"}
triggers:
cron: "5 * * * *"
scheduling:
queue: "high"
priority: 8
preempt: true
retries: {max:3, backoff:"expo", jitter_ms:200}
timeout_s: 5400
sla:
latency_ms: {p50:3000, p95:10000, p99:20000}
availability: 0.999
error_rate: 0.005
alert_rules:
- {name:"p99_breach", rule:"latency_ms.p99>20000 for 10m", severity:"high"}
resources:
requests: {cpu: 8, mem_gb: 32, gpu: 0}
limits: {cpu: 16, mem_gb: 64, gpu: 0}
disk_gb: 500
net_mbps: 1200
qos: "guaranteed"
autoscale:
enabled: true
policy: {metric:"qps", target:0.7, min_replicas:4, max_replicas:64, cooldown_s:120}
cost:
budget: {currency:"USD", monthly_cap: 5000}
pricing_refs: {compute:"pricing/compute@v1.0", storage:"pricing/storage@v1.0", egress:"pricing/egress@v1.0"}
metrology: {units:"SI", check_dim:true}
XI. Lint 规则(节选,规范性)
lint_rules:
- id: ORCH.ORCHESTRATOR_ALLOWED
when: "$.orchestration.orchestrator"
assert: "value in ['airflow','argo','ray','custom']"
level: error
- id: SCHED.TIMEOUT_DEFINED
when: "$.scheduling.timeout_s"
assert: "is_number(value) and value > 0"
level: error
- id: SCHED.RETRIES_VALID
when: "$.scheduling.retries"
assert: "value.max >= 0 and value.backoff in ['expo','linear']"
level: error
- id: SLA.METRICS_DEFINED
when: "$.scheduling.sla"
assert: "has_keys(latency_ms, availability, error_rate)"
level: error
- id: RES.REQUESTS_LIMITS
when: "$.resources"
assert: "has_keys(requests, limits) and requests.cpu <= limits.cpu and requests.mem_gb <= limits.mem_gb"
level: error
- id: AUTOSCALE.BOUNDS
when: "$.autoscale"
assert: "value.enabled == false or (value.policy.min_replicas >= 1 and value.policy.max_replicas >= value.policy.min_replicas)"
level: error
- id: METROLOGY.SI_AND_CHECKDIM
when: "$.metrology"
assert: "units == 'SI' and check_dim == true"
level: error
XII. 导出清单与审计
export_manifest:
version: "v1.0"
artifacts:
- {path:"orchestration/dag.yaml", sha256:"..."}
- {path:"scheduling/policies.yaml", sha256:"..."}
- {path:"resources/usage.report.csv", sha256:"..."}
- {path:"autoscale/history.csv", sha256:"..."}
- {path:"cost/monthly_report.csv", sha256:"..."}
references:
- "EFT.WP.Core.DataSpec v1.0:EXPORT"
- "EFT.WP.Core.Metrology v1.0:check_dim"
XIII. 本章合规自检
- 编排后端、DAG 并发与依赖、触发方式完整;循环拓扑已在 Lint 放行并具收敛条件。
- 队列/优先级/抢占、重试/超时、SLA 与告警规则明确并生效。
- 资源 requests/limits 与 qos 合理;自动扩缩边界与目标阈值设定且与预算协同。
- 计量采用 SI 且 check_dim=true;性能/资源单位一致。
- 导出清单列出 DAG、调度策略、使用/扩缩与成本报告并具 sha256;引用锚点齐全,满足发布门槛。
版权与许可(CC BY 4.0)
版权声明:除另有说明外,《能量丝理论》(含文本、图表、插图、符号与公式)的著作权由作者(“屠广林”先生)享有。
许可方式:本作品采用 Creative Commons 署名 4.0 国际许可协议(CC BY 4.0)进行许可;在注明作者与来源的前提下,允许为商业或非商业目的进行复制、转载、节选、改编与再分发。
署名格式(建议):作者:“屠广林”;作品:《能量丝理论》;来源:energyfilament.org;许可证:CC BY 4.0。
首次发布: 2025-11-11|当前版本:v5.1
协议链接:https://creativecommons.org/licenses/by/4.0/