/TechLab
数据工程
数据工程可观测性Airflow

从零到一:构建可观测的数据管道

陈昊然 · 2026年2月10日 · 28 分钟

数据管道的可观测性困境

后端服务的可观测性已有成熟方案——Metrics、Logging、Tracing 三大支柱是行业共识,Prometheus、Grafana、Jaeger 是标配工具。但数据管道的可观测性远没有这么标准化。

为什么数据管道的可观测性更难?

相比在线服务,数据管道有几个独特的挑战:

  1. 批处理为主:不像在线服务有持续的请求流,批处理任务可能几小时跑一次,异常检测的信号密度低
  2. 链路更长:一条数据从源头到最终报表可能经过 10+ 个步骤,跨多个引擎和系统
  3. "正确"的定义更模糊:服务要么返回 200 要么返回 500,数据管道可能"成功执行"但输出了错误的数据
  4. 时间维度复杂:数据有 Event Time、Processing Time、Wall Clock Time 三个时间维度
  5. 影响是延迟显现的:管道出错可能要等到业务方看报表时才发现,中间可能已经过去几个小时

常见的痛点

我们在多个客户项目中反复遇到的问题:

  • 任务失败了,不知道影响范围有多大——这个失败会导致哪些下游报表延迟?
  • 数据延迟了,不知道卡在哪一步——是数据源问题?ETL 慢了?还是资源不够?
  • 数据质量下降了,不知道什么时候开始的——是今天的变更引起的还是早就有了?
  • 成本在增长,不知道哪些任务消耗最大——Spark 作业的资源利用率如何?

可观测性框架

我们提出数据管道可观测性的四个维度,每个维度回答不同的问题:

1. 执行可观测性(Execution)

核心问题:任务跑了吗?跑成功了吗?

这是最基础的维度,也是最容易实现的:

关键指标:

指标含义典型告警规则
DAG 运行状态成功/失败/运行中任务失败即告警
任务执行时间每个 Task 的耗时耗时超过历史均值 2 倍
失败率近 7 天的任务失败比例失败率 > 5%
重试率自动重试的比例重试率 > 20%(说明不稳定)
SLA 达标率数据是否在承诺时间前就绪SLA 达标率 < 95%
任务队列深度等待执行的任务数队列积压 > 阈值

Airflow + Prometheus 集成:

# 在 airflow.cfg 中启用 StatsD
[metrics]
statsd_on = True
statsd_host = statsd-exporter
statsd_port = 9125
statsd_prefix = airflow

# StatsD Exporter 将指标转换为 Prometheus 格式
# 然后 Grafana 连接 Prometheus 展示

我们建设的 Airflow 监控 Dashboard 包含:

  • DAG 运行状态概览(绿/黄/红)
  • 每日任务执行时间趋势(折线图)
  • 最近失败的任务列表(表格)
  • SLA 达标率趋势(柱状图)
  • Worker 资源利用率(CPU/内存/磁盘)

2. 数据质量可观测性(Quality)

核心问题:数据对吗?

执行成功 ≠ 数据正确。数据质量可观测性是整个体系中最有价值但也最复杂的部分。

数据质量的分层检查:

第1层:结构性检查(必须)
  ├── 表/分区是否存在
  ├── 行数是否在合理范围
  └── Schema 是否变化

第2层:完整性检查(必须)
  ├── 核心字段的空值率
  ├── 主键的唯一性
  └── 外键关联的完整性

第3层:准确性检查(按需)
  ├── 数值范围检查
  ├── 枚举值合法性
  └── 业务规则验证

第4层:统计性检查(按需)
  ├── 分布异常检测
  ├── 趋势异常检测
  └── 跨源数据一致性

Great Expectations 实战配置:

# great_expectations/expectations/order_summary_suite.json
{
  "expectation_suite_name": "order_summary_quality",
  "expectations": [
    {
      "expectation_type": "expect_table_row_count_to_be_between",
      "kwargs": { "min_value": 10000, "max_value": 1000000 }
    },
    {
      "expectation_type": "expect_column_values_to_not_be_null",
      "kwargs": { "column": "order_id" }
    },
    {
      "expectation_type": "expect_column_values_to_be_unique",
      "kwargs": { "column": "order_id" }
    },
    {
      "expectation_type": "expect_column_values_to_be_between",
      "kwargs": {
        "column": "amount",
        "min_value": 0,
        "max_value": 1000000
      }
    },
    {
      "expectation_type": "expect_column_values_to_be_in_set",
      "kwargs": {
        "column": "status",
        "value_set": ["pending", "paid", "shipped", "delivered", "cancelled"]
      }
    }
  ]
}

Soda Core 作为替代方案:

# soda/checks/order_summary.yml
checks for order_summary:
  - row_count > 10000
  - missing_count(order_id) = 0
  - duplicate_count(order_id) = 0
  - min(amount) >= 0
  - max(amount) <= 1000000
  - values in (status) must be in ['pending', 'paid', 'shipped', 'delivered', 'cancelled']
  # 统计性检查
  - anomaly detection for row_count
  - anomaly detection for avg(amount)

我们的选择: Soda Core 语法更简洁,适合快速上手;Great Expectations 功能更强大,适合复杂场景。两者都支持集成到 Airflow 任务中。

数据新鲜度(Freshness)监控:

数据新鲜度是最容易被忽视但最影响业务信任度的指标:

-- 检查数据新鲜度的 SQL
SELECT
  MAX(updated_at) as latest_record,
  NOW() - MAX(updated_at) as freshness_lag
FROM order_summary;
-- 如果 freshness_lag > SLA 阈值,触发告警

3. 血缘可观测性(Lineage)

核心问题:数据从哪来?到哪去?

这部分的详细内容请参考我们的《构建企业级数据血缘系统的实践》一文。在可观测性的语境下,血缘系统需要支持:

  • 影响分析:一个上游变更影响哪些下游任务和报表
  • 根因定位:数据异常时沿血缘反向追踪到问题源头
  • 变更追踪:Schema 变更、逻辑变更的历史记录

血缘 + 质量联动(杀手级功能):

在血缘图上叠加数据质量信息:

  • 节点颜色表示健康状态(绿色=正常、黄色=告警、红色=异常)
  • 异常时自动高亮可能受影响的下游节点
  • 点击节点查看最近的质量检查报告

4. 成本可观测性(Cost)

核心问题:花了多少钱?值不值?

随着数据量增长,成本控制变得越来越重要:

Spark/Flink 作业的成本分析:

# 采集 Spark 作业的资源消耗指标
# 通过 Spark REST API 获取
import requests

def get_spark_app_metrics(spark_history_url, app_id):
    resp = requests.get(f"{spark_history_url}/api/v1/applications/{app_id}")
    app_info = resp.json()

    # 计算资源消耗
    executor_count = len(app_info.get("attempts", [{}])[0].get("sparkUser", ""))
    duration_seconds = (app_info["attempts"][0]["endTimeEpoch"] -
                       app_info["attempts"][0]["startTimeEpoch"]) / 1000

    return {
        "app_id": app_id,
        "duration_s": duration_seconds,
        "executor_count": executor_count,
        # 成本 = 资源 × 时间 × 单价
        "estimated_cost_usd": executor_count * duration_seconds / 3600 * COST_PER_HOUR
    }

成本看板应包含:

  • 每个 DAG/任务的日均成本(按计算资源估算)
  • 成本 Top-10 排行榜
  • 成本趋势(环比/同比增长率)
  • 按部门/业务线的成本分摊

存储成本追踪:

  • 每个 Database/Table 的存储占用趋势
  • 冷数据(长期未访问)识别
  • 存储层级建议(热数据 → SSD,冷数据 → S3)

实施路径

第一阶段:基础监控(1-2 周)

目标: 知道任务跑没跑,跑成功没

具体步骤:

  1. Airflow DAG 监控仪表盘(Dashboard)
  2. 任务失败告警到企业微信/钉钉/飞书
  3. 关键路径的 SLA 监控
  4. Worker 节点的资源监控(CPU/内存/磁盘)

技术方案:

  • Airflow + StatsD Exporter + Prometheus + Grafana
  • 告警通过 Alertmanager 发送到企业 IM

投入预估: 1 个数据工程师 × 1-2 周

第二阶段:数据质量(2-4 周)

目标: 知道数据对不对

具体步骤:

  1. 识别 Top-20 核心数据表
  2. 为每张表定义质量检查规则
  3. 将质量检查集成到 Airflow DAG 中
  4. 质量报告自动生成和分发
  5. 建立质量规则库(方便复用)

技术方案:

  • Soda Core / Great Expectations
  • 质量检查作为 Airflow Task 运行
  • 检查结果写入 Prometheus,在 Grafana 展示

投入预估: 1-2 个数据工程师 × 2-4 周

第三阶段:血缘与成本(1-2 月)

目标: 知道数据从哪来,花了多少钱

具体步骤:

  1. OpenLineage 集成(Spark、Airflow)
  2. 部署 Marquez 或 DataHub
  3. 血缘可视化
  4. 成本采集与分析

投入预估: 2-3 个数据工程师 × 1-2 月

第四阶段:智能化(持续)

目标: 从被动告警到主动预测

方向:

  • 异常检测自动化(基于历史数据的统计模型)
  • 根因分析自动化(结合血缘关系定位问题源头)
  • 容量预测(预测存储和计算资源的增长趋势)
  • 成本优化建议(自动识别低效任务)

告警策略

告警不是越多越好——告警疲劳是可观测性系统最大的敌人。关键是分层分级:

告警分级

等级场景通知方式响应时间
P0核心业务管道失败电话 + IM15 分钟
P1数据质量异常(核心表)IM 消息1 小时
P2任务延迟超过 SLAIM 消息4 小时
P3资源使用异常(非紧急)邮件次日
P4趋势异常(统计检测)周报汇总下次迭代

告警收敛策略

# Alertmanager 配置片段
route:
  group_by: ['alertname', 'dag_id']
  group_wait: 30s       # 等待30秒收敛同类告警
  group_interval: 5m    # 同一组告警的发送间隔
  repeat_interval: 4h   # 告警重复提醒间隔

  routes:
    - match:
        severity: p0
      receiver: phone-call
      repeat_interval: 15m

    - match:
        severity: p1
      receiver: wecom-webhook

    - match:
        severity: p2
      receiver: wecom-webhook
      group_wait: 5m

    - match:
        severity: p3
      receiver: email
      repeat_interval: 24h

减少误报的技巧

  1. 设置合理的阈值:基于历史数据的 P95/P99 值,而不是拍脑袋
  2. 引入静默期:已知的维护窗口期内静默告警
  3. 关联抑制:上游任务已经告警了,下游任务的连锁告警自动抑制
  4. 趋势告警替代绝对告警:不是"行数 < 10000"而是"行数相比昨天下降 > 50%"

工具选型总结

维度推荐工具替代方案
任务调度AirflowDagster, Prefect
指标采集Prometheus + StatsD ExporterInfluxDB
可视化GrafanaSuperset
告警AlertmanagerPagerDuty
数据质量Soda CoreGreat Expectations
数据血缘OpenLineage + MarquezDataHub
日志LokiELK Stack

总结

可观测的数据管道不是奢侈品,而是可靠数据平台的必要条件。

我们的核心建议:

  1. 从基础监控开始,不要试图一步建成完整体系
  2. 数据质量检查的 ROI 最高,优先投入
  3. 告警策略比告警工具更重要——合理分级、避免疲劳
  4. 血缘系统的价值在长期显现,早建设早受益
  5. 成本可观测性在数据规模增长后会变得越来越重要

从今天开始,让你的数据管道"开口说话"——它跑得怎么样、数据对不对、花了多少钱。这些问题的答案,不应该靠人猜。