/TechLab
数据工程
大数据Flink流处理

Apache Flink 实时数据处理入门到实战

陈昊然 · 2026年3月10日 · 30 分钟

为什么选择 Flink?

在实时数据处理领域,Apache Flink 已经成为事实标准。相比 Spark Streaming 的微批处理,Flink 提供了真正的事件驱动流处理能力,在延迟、吞吐量和状态管理方面都有显著优势。

流处理引擎对比

特性FlinkSpark StreamingKafka Streams
处理模型真流处理微批处理真流处理
延迟毫秒级秒级毫秒级
状态管理内置 Keyed State需依赖外部存储内置 State Store
窗口机制丰富灵活基本基本
语义保证Exactly-onceExactly-onceExactly-once
部署方式独立集群/K8s/YARN依赖 Spark 集群嵌入式(无集群)
适用规模大规模分布式大规模分布式中小规模
SQL 支持Flink SQL(成熟)Spark SQLKSQL

我们的选型建议:

  • 已有 Spark 生态且延迟容忍秒级 → 继续 Spark Streaming
  • 轻量级场景,数据在 Kafka 中 → Kafka Streams
  • 需要毫秒级延迟 + 复杂状态管理 → Flink

核心概念深入

流与批的统一

Flink 的核心理念是"流批一体"——批处理是流处理的特例(有界流)。这意味着你只需要掌握一套 API,就能同时处理实时流和历史批数据。

从 Flink 1.12 开始的 DataStream API 统一,到 Flink SQL 的流批一体,这个理念已经完全落地:

-- 同一条 SQL,既能跑流也能跑批
SELECT
  category,
  TUMBLE_START(event_time, INTERVAL '1' HOUR) AS window_start,
  COUNT(*) AS order_count,
  SUM(amount) AS total_amount
FROM orders
GROUP BY category, TUMBLE(event_time, INTERVAL '1' HOUR);

时间语义

Flink 支持三种时间语义:

  • Event Time:事件实际发生的时间,从数据本身提取
  • Ingestion Time:事件进入 Flink 的时间
  • Processing Time:算子处理事件的时间

生产环境中,99% 的场景应该使用 Event Time。 只有 Event Time 才能保证结果的确定性——即使数据乱序到达或任务重启重放,结果都是一致的。

Watermark 机制详解

Watermark 是 Flink 处理乱序数据的核心机制。简单来说,Watermark 告诉 Flink:"在这个时间点之前的数据应该都到了"。

Watermark 生成策略:

// 策略1:固定延迟(最常用)
WatermarkStrategy
    .<OrderEvent>forBoundedOutOfOrderness(Duration.ofSeconds(5))
    .withTimestampAssigner((event, ts) -> event.getTimestamp());

// 策略2:单调递增(数据严格有序时使用)
WatermarkStrategy
    .<OrderEvent>forMonotonousTimestamps()
    .withTimestampAssigner((event, ts) -> event.getTimestamp());

// 策略3:自定义(复杂业务场景)
WatermarkStrategy
    .<OrderEvent>forGenerator(ctx -> new CustomWatermarkGenerator())
    .withTimestampAssigner((event, ts) -> event.getTimestamp());

设置延迟的经验值:

  • 数据源是内部系统 → 1-5 秒
  • 数据源是移动端上报 → 5-30 秒
  • 数据源是 IoT 设备 → 30 秒 - 5 分钟
  • 原则:延迟设大了影响时效性,设小了丢数据

迟到数据处理:

即使有 Watermark,仍然会有数据迟到。Flink 提供了分层处理机制:

OutputTag<OrderEvent> lateTag = new OutputTag<>("late-data"){};

SingleOutputStreamOperator<Result> result = orders
    .keyBy(OrderEvent::getCategory)
    .window(TumblingEventTimeWindows.of(Time.minutes(5)))
    .allowedLateness(Time.minutes(1))     // 额外等待1分钟
    .sideOutputLateData(lateTag)          // 超过1分钟的数据走侧输出
    .aggregate(new GmvAggregateFunction());

// 处理迟到数据(写入专门的表或重新计算)
DataStream<OrderEvent> lateData = result.getSideOutput(lateTag);

状态管理

Flink 内置了强大的状态管理,这是它相比其他流处理引擎的核心优势:

Keyed State 类型:

  • ValueState<T>:存储单个值,最常用
  • ListState<T>:存储列表
  • MapState<K, V>:存储键值对
  • ReducingState<T>:自动聚合的值
  • AggregatingState<IN, OUT>:自定义聚合逻辑

实际案例——实时去重统计:

public class DistinctCountFunction extends KeyedProcessFunction<String, Event, Result> {
    private ValueState<Long> countState;
    private MapState<String, Boolean> seenState;

    @Override
    public void open(Configuration parameters) {
        countState = getRuntimeContext().getState(
            new ValueStateDescriptor<>("count", Long.class, 0L));
        seenState = getRuntimeContext().getMapState(
            new MapStateDescriptor<>("seen", String.class, Boolean.class));
    }

    @Override
    public void processElement(Event event, Context ctx, Collector<Result> out) throws Exception {
        String uid = event.getUserId();
        if (!seenState.contains(uid)) {
            seenState.put(uid, true);
            countState.update(countState.value() + 1);
        }
        out.collect(new Result(event.getCategory(), countState.value()));
    }
}

状态后端深入对比

特性HashMapStateBackendEmbeddedRocksDBStateBackend
存储位置JVM 堆内存磁盘(RocksDB)
状态大小限制受限于 JVM 堆仅受磁盘空间限制
读写性能极快(内存操作)较慢(需序列化 + 磁盘 IO)
Checkpoint 方式全量快照支持增量 Checkpoint
适用场景状态小(<几 GB)状态大(TB 级)
内存管理受 GC 影响堆外内存,无 GC

生产建议: 默认使用 RocksDB。只在状态确实很小(<1GB)且延迟极其敏感的场景使用 HashMap。

实战场景

场景一:电商实时 GMV 计算

这是最经典的 Flink 入门案例:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(8);

// 1. 读取 Kafka 订单流
DataStream<OrderEvent> orders = env
    .addSource(new FlinkKafkaConsumer<>("orders", new OrderSchema(), kafkaProps))
    .assignTimestampsAndWatermarks(
        WatermarkStrategy.<OrderEvent>forBoundedOutOfOrderness(Duration.ofSeconds(5))
            .withTimestampAssigner((event, ts) -> event.getTimestamp())
    );

// 2. 按品类分组,1分钟滚动窗口聚合
DataStream<CategoryGmv> gmvStream = orders
    .keyBy(OrderEvent::getCategory)
    .window(TumblingEventTimeWindows.of(Time.minutes(1)))
    .aggregate(new GmvAggregateFunction(), new GmvWindowFunction());

// 3. 写入 ClickHouse
gmvStream.addSink(new ClickHouseSink<>("category_gmv_realtime"));

场景二:实时异常检测(CEP)

使用 Flink CEP 库实现复杂事件模式匹配:

// 检测:同一用户在5分钟内登录失败3次以上
Pattern<LoginEvent, ?> pattern = Pattern.<LoginEvent>begin("first")
    .where(new SimpleCondition<LoginEvent>() {
        @Override
        public boolean filter(LoginEvent event) {
            return event.getStatus().equals("FAILED");
        }
    })
    .timesOrMore(3)
    .within(Time.minutes(5));

PatternStream<LoginEvent> patternStream = CEP.pattern(
    loginEvents.keyBy(LoginEvent::getUserId), pattern);

patternStream.select(new PatternSelectFunction<LoginEvent, Alert>() {
    @Override
    public Alert select(Map<String, List<LoginEvent>> pattern) {
        List<LoginEvent> events = pattern.get("first");
        return new Alert(events.get(0).getUserId(),
            "BRUTE_FORCE_DETECTED", events.size());
    }
}).addSink(new AlertSink());

场景三:实时数据 Join

流与流的 Join 是实时数据处理中的高难度操作:

// 订单流 Join 支付流(10分钟间隔窗口)
DataStream<EnrichedOrder> enriched = orders
    .keyBy(OrderEvent::getOrderId)
    .intervalJoin(payments.keyBy(PaymentEvent::getOrderId))
    .between(Time.seconds(0), Time.minutes(10))
    .process(new ProcessJoinFunction<OrderEvent, PaymentEvent, EnrichedOrder>() {
        @Override
        public void processElement(OrderEvent order, PaymentEvent payment,
                                   Context ctx, Collector<EnrichedOrder> out) {
            out.collect(new EnrichedOrder(order, payment));
        }
    });

Join 策略选择:

Join 类型适用场景注意事项
Interval Join两条流有时间关联指定合理的时间范围
Window Join两条流在同一窗口内窗口大小影响匹配率
Temporal Join (SQL)流与维表关联维表需要支持时态查询
Broadcast Join小表广播到所有分区小表不能太大

生产部署要点

集群架构

Client(提交任务)
    ↓
JobManager(HA 模式,2-3 节点)
    ├── Dispatcher:接收任务
    ├── ResourceManager:管理资源
    └── JobMaster:管理作业生命周期
    ↓
TaskManager(工作节点,N 个)
    ├── Task Slot 1
    ├── Task Slot 2
    └── Task Slot N

资源规划

TaskManager 配置:

  • 内存:4-8 GB(无状态任务)/ 16-32 GB(有状态任务)
  • Slot 数:通常等于 CPU 核数
  • 实例数 = 总并行度 / 每个 TM 的 Slot 数

Parallelism 设置:

  • Source 并行度 = Kafka Partition 数
  • 中间算子并行度 = Source 1-2 倍
  • Sink 并行度根据下游写入能力调整
  • 避免数据倾斜:热点 Key 需要加盐打散

Checkpoint 配置:

env.enableCheckpointing(30000); // 30秒
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(10000);
env.getCheckpointConfig().setCheckpointTimeout(120000);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.getCheckpointConfig().setExternalizedCheckpointCleanup(
    ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

监控告警

必须监控的指标:

  1. Checkpoint Duration:P99 超过 checkpoint 间隔的 50% 要告警
  2. Backpressure:反压持续 > 5 分钟说明处理能力不足
  3. Record Lag:Kafka consumer lag 反映整体延迟
  4. state.size:状态大小持续增长可能是状态泄漏
  5. GC Pause:频繁 Full GC 影响延迟和稳定性
  6. Restart Count:频繁重启说明有未处理的异常

Grafana Dashboard 建议包含的 Panel:

  • 消息吞吐量(条/秒)随时间变化
  • 端到端处理延迟 P50/P95/P99
  • Checkpoint 大小和耗时趋势
  • 各算子的反压状态(热力图)
  • Kafka Lag 按 Partition 展示

踩过的坑

1. 状态过期未清理

问题:使用 MapState 做去重,但数据持续增长,最终 OOM。

解决:配置状态 TTL:

StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.days(7))
    .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
    .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
    .build();
stateDescriptor.enableTimeToLive(ttlConfig);

2. 数据倾斜

问题:某些热点品类的订单量远超其他品类,导致个别 Subtask 负载过高。

解决:二次聚合模式:

// 第一层:加盐打散,预聚合
orders
    .map(e -> { e.setCategory(e.getCategory() + "-" + ThreadLocalRandom.current().nextInt(10)); return e; })
    .keyBy(OrderEvent::getCategory)
    .window(TumblingEventTimeWindows.of(Time.minutes(1)))
    .aggregate(new PreAggFunction())
    // 第二层:去盐,最终聚合
    .map(r -> { r.setCategory(r.getCategory().split("-")[0]); return r; })
    .keyBy(Result::getCategory)
    .window(TumblingEventTimeWindows.of(Time.minutes(1)))
    .aggregate(new FinalAggFunction());

3. Kafka Offset 提交策略

问题:Checkpoint 成功但 Kafka offset 未提交,任务重启后大量重复消费。

解决:确保 enable.auto.commit 设为 false,让 Flink 通过 Checkpoint 机制管理 offset。

4. 序列化性能

问题:使用 Java 默认序列化导致 Checkpoint 极慢。

解决:使用 POJO 类型或注册 Kryo 序列化器。最优方案是让数据类满足 Flink 的 POJO 条件(公有类、无参构造、getter/setter)。

5. 升级任务的状态兼容

问题:修改了处理逻辑后重新部署,旧的 Savepoint 状态无法恢复。

解决

  • 使用 uid() 为每个算子指定稳定的 ID
  • 修改 State Schema 时遵循兼容性规则(只加字段,不删/改)
  • 重大变更时使用 State Processor API 做状态迁移

从 Spark Streaming 迁移

如果你的团队目前使用 Spark Structured Streaming,迁移到 Flink 需要注意:

  • 窗口语义差异较大,需要重新设计
  • Flink 的水印机制更灵活但也更复杂
  • 状态管理方式完全不同,存量状态无法直接迁移
  • Flink SQL 的语法与 Spark SQL 有差异
  • 建议新项目用 Flink,存量项目评估迁移 ROI 再决定

迁移策略: 不建议大规模一次性迁移。用新项目作为试点,积累经验后再逐步迁移存量任务。

总结

Flink 是构建实时数据管道的最佳选择之一。掌握其核心概念——时间语义、Watermark、状态管理、窗口机制——是用好 Flink 的关键。在实际项目中,数据倾斜、状态管理和 Checkpoint 优化是最常遇到的挑战,也是区分初级和高级 Flink 工程师的关键能力。

投入时间理解 Flink 的内部机制,远比只学会用 API 更有价值。当你理解了 Watermark 如何传播、Checkpoint 如何工作、状态如何管理,遇到问题时才能快速定位和解决。