一文教你弄懂Flink核心功能和原理(flink原理、实战与性能优化)
导读1.Flink概述Flink诞生于欧洲的一个大数据研究项目StratoSphere。该项目是柏林工业大学的一个研究性项目。早期,Flink 是做 Batch 计算的,但是在 2014 年,StratoSphere 里面的核心成员孵化出 Fl...
1.Flink概述
Flink诞生于欧洲的一个大数据研究项目StratoSphere。该项目是柏林工业大学的一个研究性项目。早期,Flink 是做 Batch 计算的,但是在 2014 年,StratoSphere 里面的核心成员孵化出 Flink,同年将 Flink 捐赠 Apache,并在后来成为 Apache 的顶级大数据项目,同时 Flink 计算的主流方向被定位为 Streaming,即用流式计算来做所有大数据的计算,这就是 Flink 技术诞生的背景。
Flink 官网:https://flink.apache.org/
核心理念:Apache Flink 是为分布式、高性能、随时可用以及准确的流处理应用程序打造的开源流处理框架

Apache Flink 是一个面向分布式数据流处理和批量数据处理的开源计算平台,它能够基于同 一个 Flink 运行时(Flink Runtime),提供支持流处理和批处理两种类型应用的功能。现有的开源计算方案,会把流处理和批处理作为两种不同的应用类型,因为他们它们所提供的 SLA 是完全不相同的:流处理一般需要支持低延迟、Exactly-once 保证,而批处理需要支持高吞吐、高效处理,所以在实现的时候通常是分别给出两套实现方法,或者通过一个独立的开源框架来实现其中每一种处理方案。例如,实现批处理的开源方案有 MapReduce、Tez、Crunch、 Spark,实现流处理的开源方案有 Samza、Storm。
Flink 在实现流处理和批处理时,与传统的一些方案完全不同,它从另一个视角看待流处理和批处理,将二者统一起来:Flink 是完全支持流处理,也就是说作为流处理看待时输入数据流是无界的;批处理被作为一种特殊的流处理,只是它的输入数据流被定义为有界的。基 于同一个 Flink 运行时(Flink Runtime),分别提供了流处理和批处理 API,而这两种 API 也是 实现上层面向流处理、批处理类型应用框架的基础。

总的来说,现有的开源计算方案,会把流处理和批处理作为两种不同的应用类型,因为它们所提供 的 SLA(Service-Level-Aggreement,服务等级协议)是完全不相同的:
流处理一般需要支持低延迟、Exactly-once 保证,
批处理需要支持高吞吐、高效处理。
Flink 从另一个视角看待流处理和批处理,将二者统一起来:
Flink 是完全支持流处理,也就是说作为流处理看待时输入数据流是无界的;
批处理被作为一种特殊的流处理,只是它的输入数据流被定义为有界的。
2.Flink的特性
2.1Flink核心特性
支持高吞吐、低延迟、高性能的流处理
支持带有事件时间的窗口(Window)操作
支持有状态计算的 Exactly-once 语义
支持高度灵活的窗口(time/count/session)Window 操作,以及 data-driven 驱动
支持具有 BackPressure 功能的持续流模型
支持基于轻量级分布式快照(Snapshot)实现的容错
一个运行时同时支持 Batch on Streaming 处理和Streaming 处理
Flink 在 JVM 内部实现了自己的内存管理
支持迭代计算
支持程序自动优化:避免特定情况下 Shuffle、排序等昂贵操作,中间结果进行缓存
2.2Flink特点
Streaming-first:流处理引擎
Fault-tolerant:容错,可靠性,checkpoint
Scalable:可扩展性,1000 节点以上
Performance:性能,高吞吐量,低延迟
2.3Flink关键特性
低延时:提供 ms 级时延的处理能力
Exactly Once:提供异步快照机制,保证所有数据真正处理一次
HA:JobManager 支持主备模式,保证无单点故障
水平扩展能力:TaskManager 支持手动水平扩展
2.4Hadoop兼容性
Flink 能够支持 YARN,能够从 HDFS 和 HBase 中获取数据
能够使用所有的 Hadoop 的格式化输入和输出
能够使用 Hadoop 原有的 Mers 和 Reducers,并且能与 Flink 的操作混合使用
能够更快的运行 Hadoop 作业
3.Flink的优势


流场景使用案例 | 正确性保证 | API 分层体系 |
1、数据驱动的应用 2、批流数据分析 3、数据通道和 ETL | 1、Exactly-once 状态一致性保 证 2、事件时间处理 3、复杂的 late date 处理 | 1、统一 SQL 支持 Stream 和 Batch 数据处理 2、DataStream API & DataSet API 3、ProcessFunction (Time & State) |
Operational Focus | 适用于各种应用场景 | 高性能 |
1、部署灵活 2、高可用配置 3、Savepoints | 1、架构可扩展 2、超大 state 支持 3、增量 checkpointing | 1、低延时 2、高吞吐 3、内存计算 |
All streaming use cases:所有的流式用例
Guaranteed correctness:保证数据的准确性
Layered APIs:分层的API
Operational Focus:重点放在流处理的操作方面
Scales to any use case:可扩展至任何用例
Excellent Performance:完美的表现
4.Flink核心四大基石

Apache Flink 之所以能越来越受欢迎,我们认为离不开它最重要的四个基石:
Checkpoint、State、Time、Window

5.Flink应用场景
Event-driven Applications:事件驱动应用
Data Analytics Applications:数据分析应用
Data Pipeline Applications:数据挖掘应用
Flink 最适合的应用场景是低延时的数据处理场景:高并发处理数据,实验毫秒级,且兼具可靠性。
典型应用场景有互联网金融业务、点击流日志处理、舆情监控。
优化电子商务的实时搜索结果:阿里巴巴的所有基础设施团队使用 flink 实时更新产品细节和库存信息,为用户提供更高的关联性。
针对数据分析团队提供实时流处理服务:king 通过 flink-powered 数据分析平台提供实 时数据分析,从游戏数据中大幅缩短了观察时间
网络/传感器检测和错误检测:Bouygues 电信公司,是法国最大的电信供应商之一, 使用 flink 监控其有线和无线网络,实现快速故障响应。
商业智能分析 ETL:Zalando 使用 flink 转换数据以便于加载到数据仓库,将复杂的转换操作转化为相对简单的并确保分析终端用户可以更快的访问数据。

Flink 的典型功能:
Flink 是一个纯流式系统,吞吐量实际测试可达 100K EPS。而不像某些框架是用 mini batch 的模式来达到所谓的流式处理的;
面对不同的用户数据格式,我们必须支持多种数据源,这一点上 Flink 内置的对多种数 据源的支持(CSV,Kafka,Hbase,Text,Socket 数据等)也为用户数据的接入提供了便利;
Flink 强大的窗口机制(包括翻转窗口,滑动窗口,session 窗口,全窗口以及允许用户 自定义窗口)可以满足复杂的业务逻辑,使得用户可以编写复杂的业务规则;
Flink 内置的 RocksDB 数据存储格式使其数据处速度快且资源消耗少,在 Checkpoint 上 起到了至关重要的作用;
Flink 对算子(operator)的高可控性,使得用户可以灵活添加删除或更改算子行为。 这一点对于动态部署有着至关重要的意义。
Flink 非常适合于:
多种数据源(有时不可靠):当数据是由数以百万计的不同用户或设备产生的,它是安全的假设数据会按照事件产生的顺序到达,和在上游数据失败的情况下,一些事件可能会比他们晚几个小时,迟到的数据也需要计算,这样的结果是准确的。
应用程序状态管理:当程序变得更加的复杂,比简单的过滤或者增强的数据结构,这个时候管理这些应用的状态将会变得比较难(例如:计数器,过去数据的窗口,状态机,内置数据库)。Flink 提供了工具,这些状态是有效的,容错的,和可控的,所以你不需要 自己构建这些功能。
数据的快速处理:有一个焦点在实时或近实时用例场景中,从数据生成的那个时刻, 数据就应该是可达的。在必要的时候,Flink 完全有能力满足这些延迟。
海量数据处理:这些程序需要分布在很多节点运行来支持所需的规模。Flink 可以在大 型的集群中无缝运行,就像是在一个小集群一样。
6.Flink执行引擎解析/架构
6.1Flink集群架构

我们可以了解到 Flink 几个最基础的概念,Client、JobManager 和 TaskManager。Client 用来提交任务给 JobManager,JobManager 分发任务给 TaskManager 去执行,然后 TaskManager 会心跳的汇报任务状态。看到这里,有的人应该已经有种回到 Hadoop 一代的错觉。确实,从架构去看,JobManager 很像当年的 JobTracker,TaskManager 也很像当年的 TaskTracker。 然而有一个最重要的区别就 TaskManager 之间是是流(Stream)。其次,Hadoop 一代中,只有 Map 和 Reduce 之间的 Shuffle,而对 Flink 而言,可能是很多级,并且在 TaskManager 内 部和 TaskManager 之间都会有数据传递,而不像 Hadoop,是固定的 Map 到 Reduce。
6.2JobManagers, TaskManagers, Clients
Flink 是一个分布式的主从架构,既集群运行时是由主节点和从节点组成。Flink 的分布式执行包括两个重要的进程:Master 和 Worker。执行 Flink 程序时,多个进程参与执行,即作业 管理器(JobManager),任务管理器(TaskManager)和作业(JobClient)。

Flink程序需要提交给 Job Client。然后,JobClient 将作业提交给 JobManager。JobManager 负责协调资源分配和作业执行。它首先要做的是分配所需的资源。资源分配完成后,任务将提交给相应的 TaskManager。在接收任务时,TaskManager 启动一个线程以开始执行。执行到 位时,TaskManager 会继续向 JobManager 报告状态更改。可以有各种状态,例如开始执行, 正在进行或已完成。作业执行完成后,结果将发送回(JobClient)。
要了解一个系统,一般都是从架构开始。我们关心的问题是:系统部署成功后各个节点都启动了哪些服务,各个服务之间又是怎么交互和协调的。下方是 Flink 集群任务启动后架构:

Program Code:编写的 Flink 应用程序代码
Job Client:JobClient 不是 Flink 程序执行的内部部分,但它是任务执行的起点。JobClient 负责接受用户的程序代码,然后创建数据流,将数据流提交给 JobManager 以便进一步执行。 执行完成后,Job Client 将结果返回给用户
JobManager:主进程(也称为作业管理器)协调和管理程序的执行。它的主要职责包括安排任务,管理 checkpoint,故障恢复等。机器集群中至少要有一个 master,master 负责调度 task,协调 checkpoints 和容灾,高可用设置的话可以有多 master,但要保证一个是 leader, 其他是 standby;JobManager 包含 ActorSystem、Scheduler、CheckPoint三个重要的组件
TaskManager:从 JobManager 处接收需要部署的 Task。TaskManager 是在 JVM 中的一个或多个线程中执行任务的工作节点。任务执行的并行性由每个 TaskManager 上可用的任务槽决定。每个任务代表分配给任务槽的一组资源。例如,如果 TaskManager 有四个插槽,那么 它将为每个插槽分配 25%的内存。可以在任务槽中运行一个或多个线程。同一插槽中的线程共享相同的 JVM。同一 JVM 中的任务共享 TCP 连接和心跳消息。TaskManager 的一个 Slot 代表一个可用线程,该线程具有固定的内存,注意 Slot 只对内存隔离,没有对 CPU 隔离。 默认情况下,Flink 允许子任务共享 Slot,即使它们是不同 task 的 subtask,只要它们来自相同的 job。这种共享可以有更好的资源利用率。
TaskSlot:任务槽,类似于 YARN 当中的 Container,用于资源的封装。但是在 Flink 中, taskSlot 只负责封装内存的资源,不包含 CPU 的资源。每一个 TaskManager 中会包含 3 个 TaskSlot,所以每一个 TaskManager 中最多能并发执行的任务是可控的,最多 3 个。TaskSlot 有独占的内存资源,在一个 TaskManager 中可以运行不同的任务。
Task:TsakSlot 当中的 Task 就是任务执行的具体单元。
6.3Task Slots and Resources
每个 worker (TaskManager)都是一个 JVM 进程,可以在单独的线程中执行一个或多个子任务 task。为了控制 worker 接受多少 task,worker 具有所谓的 task slot(至少一个)。
每个 task slot 表示 TaskManager 资源的一个固定子集。例如,一个有三个 slots 的 TaskManager 会将其 1/3 的托管内存分配给每个插槽。对资源进行插槽管理意味着子任务不会与来自其他作业的子任务争夺托管内存,而是拥有一定数量的预留托管内存。注意,这里没有发生 CPU 隔离;当前插槽只分隔任务的托管内存。
通过调整任务槽的数量,用户可以定义子任务如何彼此隔离。每个 TaskManager 有一个插槽意味着每个任务组运行在单独的 JVM 中(例如,可以在单独的容器中启动 JVM)。拥有多个插槽意味着更多的子任务共享同一个 JVM。相同 JVM 中的任务共享 TCP 连接(通过多路复用) 和心跳消息。它们还可以共享数据集和数据结构,从而减少每个任务的开销。

默认情况下,Flink 允许子任务共享插槽,即使它们是不同任务的子任务,只要它们来自相 同的作业。结果是一个槽可以容纳作业的整个管道。允许这个插槽共享有两个主要好处:
Flink 集群需要的任务插槽与作业中使用的最高并行度一样多。不需要计算一个程序总共包 含多少任务(具有不同的并行度)。
更容易得到更好的资源利用。如果没有插槽共享,非密集型 source/map()子任务将阻塞与资 源密集型窗口子任务一样多的资源。使用插槽共享,将我们示例中的基本并行度从 2 提高到 6,可以充分利用插槽资源,同时确保繁重的子任务在任务管理器中得到公平分配。

6.4Tasks and Operator Chains
对于分布式执行,将操作符子任务一起链接到任务中。每个任务由一个线程执行。将操作符链接到任务中是一种有用的优化:它减少了线程到线程切换和缓冲的开销,增加了总体吞吐量,同时降低了延迟,可以对链接行为进行配置。下中的示例数据流使用 5 个子任务执行,因此使用 5 个并行线程。

6.5Flink组织架构

Flink 具有分层架构,其中每个组件都是特定层的一部分。每个层都建立在其他层之上,以实现清晰的抽象。Flink 旨在在本地,YARN 群集或云上运行。Runtime 是 Flink 的核心数据处理引擎,它通过 JobGraph 形式的 API 接收程序,在执行 JobGraph 时,Flink 提供了多种候选部署方案(如 local,remote,YARN 等)。JobGraph 即为一个一般化的并行数据流(data flow), 它拥有任意数量的 Task 来接收和产生 data stream。
DataStream 和 DataSet API 是程序员可用于定义 Job 的接口。编译程序时,这些 API 会生成 JobGraphs。编译后,DataSet API 允许优化器生成最佳执行计划,而 DataStream API 使用流 构建来实现高效的执行计划。DataStream API 和 DataSet API 都会使用单独编译的处理方式生 成 JobGraph。DataSet API 使用 optimizer 来决定针对程序的优化方法,而 DataStream API 则 使用 stream builder 来完成该任务。
然后根据部署模型将优化的 JobGraph 提交给执行程序。可以选择本地,远程或 YARN 部署模式。如果已经运行了 Hadoop 集群,那么最好使用 YARN 部署模式。
Flink附随了一些产生DataSet或DataStream API程序的的类库和API:处理逻辑表查询的Table, 机器学习的 FlinkML,像处理的 Gelly,复杂事件处理的 CEP。

7.Flink容错 State 和 Checkpoint
7.1状态解释
在批处理过程中,数据是划分为块分片去完成的,然后每一个 Task 去处理一个分片。当 分片执行完成后,把输出聚合起来就是最终的结果。在这个过程当中,对于 state 的需求还是比较小的。在流计算过程中,对 State 有非常高的要求,因为在流系统中输入是一个无限制的流, 会持续运行从不间断。在这个过程当中,就需要将状态数据很好的管理起来。Flink 的失败恢复依赖于“检查点机制+可部分重发的数据源”。检查点机制:检查点定期触发,产生快照,快照中记录了:(1)当前检查点开始时数据源(例如 Kafka)中消息的 offset (2)记录了所有有状态的 operator 当前的状态信息(例如 sum 中的数值)。可部分重发的数据源:Flink 选择最近完成的检查点 K。然后系统重放整个分布式的数据 流,然后给予每个 operator 他们在检查点 k 快照中的状态。数据源被设置为从位置 Sk 开始 重新读取流。例如在 Apache Kafka 中,那意味着告诉消费者从偏移量 Sk 开始重新消费。Flink 中有两种基本类型的 State,即 Keyed State 和 Operator State。State 可以被记录,在失败的情况下数据还可以恢复。state 一般指一个具体的 task/operator 的状态【state 数据默认保存在 JVM 的 堆内存中】
7.2State详解
我们写的 wordcount 的程序没有包含状态管理。如果一个 task 在运行中挂掉了,那 么它在内存当中的状态数据都会丢失。所有的数据都需要重新计算。从容错和消息处理的语义上,Flink 引入了 State 和 CheckPoint
首先区分两个概念:
State 一般是指一个具体的 task/operator 的状态(state 数据默认保存在 java 的堆内存中)
checkpoint 则表示了一个 Flink Job 在一个特定时刻的一份全局状态快照,即包含了所有 的 task/operator 的状态。可以理解成 checkpoint 是把所有 state 数据持久化存储了。
State 可以被记录,这样可以在失败的情况下进行恢复
Flink 中有两种 State
KeyedState
OperatorState
KeyedState 和 OperatorState 都有两种形式存在
原始状态 RawState 原始状态,是由用户自行管理状态具体的数据结构,框架在做 checkpoint 的时候,使用 byte[]来读写内容,对其内部数据结构一无所知
托管状态 ManagedState 托管状态是由 Flink 框架管理的状态
通常,在 DataStream 上的状态,推荐使用托管的状态,当实现一个用户自定义的 Operator 的时候,会使用到原始状态

7.3CheckPoint
checkpoint【可以理解为 checkpoint 是把 state 数据持久化存储了】,则表示 了一个 Flink Job 在一个特定时刻的一份全局状态快照,即包含了所有 task/operator 的状态
CheckPoint 是 Flink 容错的主要机制。它不断为分布式数据流和 executor 状态拍摄快照。它的思想来自 Chandy-Lamport 算法,但已根据 Flink 的定制要求进行了修改。
Flink 基于 Chandy-Lamport 算法实现了一个分布式的一致性快照,从而提供了一致性的语义。 Chandy-Lamport 算法实际上在 1985 年的时候已经被提出来,但并没有被很广泛的应用,而 Flink 则把这个算法发扬光大了。Spark 最近在实现 Continue streaming,Continue streaming 的目的是为了 降低它处理的延时,其也需要提供这种一致性的语义,最终采用 Chandy-Lamport 这个算法,说明 Chandy-Lamport 算法在业界得到了一定的肯定。
每个快照状态都会报告给 Flink 作业管理器(JobManager)的检查点协调器。在制作快照时, Flink 处理记录对齐,以避免因任何故障而重新处理相同的记录。这种对齐通常需要几毫秒。 但是对于某些要求高的应用程序,即使毫秒级的延迟也是不可接受的,我们可以选择在单个记录处理中选择低延迟。默认情况下,Flink 只处理每个记录一次。如果任何应用程序需要低延迟并且至少在一次交付就可以,我们可以关闭该触发器。这将跳过对齐并将改善延迟。
容错: checkpoint 是很重要的机制,因为 Flink 的检查点是通过分布式快照实现的,所以这里对快照和检查点不进行区分。
分布式数据流的轻量级异步快照
分布式有状态流处理支持在云中部署和执行大规模连续计算,同时针对低延迟和高吞吐量。这种模式最基本的挑战之一是在潜在的失败下提供处理保证。现有方法依赖于可用于故障恢复的周期性全局状态快照。这些方法有两个主要缺点。首先,它们经常会停止影响摄取的整 体计算。其次,他们急切地坚持传输中的所有记录以及操作状态,这导致比所需更快的快照。 在这项工作中,提出了异步屏障快照(ABS),这是一种适用于现代数据流执行引擎的轻量级算法,可最大限度地减少空间需求。ABS 仅保留非循环执行拓扑上的运算符状态,同时保持循环数据流的最小记录日志。我们在 Apache Flink 上实现了 ABS,这是一个支持有状态流处理的分布式分析引擎。我们的评估表明,我们的算法对执行没有太大的影响,保持线性可扩展性并且在频繁的快照中表现良好。
7.4Barriers Flink
分布式快照的核心概念之一是 barriers。这些 barriers 被注入数据流并与记录一起作为 数据流的一部分向下流动。barriers 永远不会超过记录,数据流严格有序。barriers 将数据流中的记录分为进入当前快照的记录和进入下一个快照的记录。每个 barriers 都带有快照的 ID, 并且 barriers 之前的记录都进入了该快照。barriers 不会中断流的流动,非常轻量级。来自不同快照的多个 barriers 可以同时在流中出现,这意味着可以同时发生各种快照。
流 barriers 是 Flink 快照的核心要素。它们被摄取到数据流中而不会影响流量。barriers 永远不会超过记录。他们将记录集合分为快照。每个 barriers 都带有唯一的 ID。下显示了如何 将 barriers 注入到快照的数据流中:

基于上:
出现一个 Barrier,在该 Barrier 之前出现的记录都属于该 Barrier 对应的 Snapshot,在该 Barrier 之后出现的记录属于下一个 Snapshot
来自不同 Snapshot 多个 Barrier 可能同时出现在数据流中,也就是说同一个时刻可能并发生成多个 Snapshot
当一个中间(Intermediate)Operator 接收到一个 Barrier 后,它会发送 Barrier 到属于该 Barrier 的 Snapshot 的数据流中,等到 Sink Operator 接收到该 Barrier 后会向 Checkpoint Coordinator 确认该 Snapshot,直到所有的 Sink Operator 都确认了该 Snapshot,才被认为 完成了该 Snapshot

一旦操作算子从一个输入流接收到快照 barriers n,它就不能处理来自该流的任何记录, 直到它从其他输入接收到 barriers n 为止。否则,它会搞混属于快照 n 的记录和属于快照 n + 1 的记录。
barriers n 所属的流暂时会被搁置。从这些流接收的记录不会被处理,而是放入输入缓冲区。
一旦从最后一个流接收到 barriers n,操作算子就会发出所有挂起的向后传送的记录, 然后自己发出快照 n 的 barriers。
之后,它恢复处理来自所有输入流的记录,在处理来自流的记录之前优先处理来自输 入缓冲区的记录
Barrier 分为两类:
BarrierBuffer 通过阻塞已接收到 barrier 的 input channel 并缓存被阻塞的 channel 中后续流入的数据流,直到所有的 barrier 都接收到或者不满足特定的检查点的条件后,才会释放这些被阻塞的 channel,这个机制被称之为–aligning(对齐)。正是这种机制来实现 EXACTLY_ONCE 的一致性(它将检查点中的数据精准得隔离开)。
而 BarrierTrack 的实现就要简单地多,它仅仅是对数据流中的 barrier 进行跟踪,但是数据流 中的元素 buffer 是直接放行的。这种情况会导致同一个检查点中可能会预先混入后续检查 点的元素,从而只能提供 AT_LEAST_ONCE 的一致性。
Snapshot 并不仅仅是对数据流做了一个状态的 Checkpoint,它也包含了一个 Operator 内部所 持有的状态,这样才能够在保证在流处理系统失败时能够正确地恢复数据流处理。
barrier 作用:它会作为数据流的记录被同等看待,被插入到数据流中,将数据流中记录的进 行分组,并沿着数据流的方向向前推进
具体排列过程如下:
Operator 从一个 incoming Stream 接收到 Snapshot Barrier n,然后暂停处理,直到其它的 incoming Stream 的 Barrier n(否则属于 2 个 Snapshot 的记录就混在一起了)到达该 Operator 接收到 Barrier n 的 Stream被临时搁置,来自这些 Stream 的记录不会被处理,而 是被放在一个 Buffer 中。
一旦最后一个 Stream 接收到 Barrier n,Operator 会 emit 所有暂存在 Buffer 中的记录, 然后向 Checkpoint Coordinator 发送 Snapshot n,继续处理来自多个 Stream 的记录
基于 Stream Aligning 操作能够实现 Exactly Once 语义,但是也会给流处理应用带来延迟,因为为了排列对齐 Barrier,会暂时缓存一部分 Stream 的记录到 Buffer 中,尤其是在数据流并行度很高的场景下可能更加明显,通常以最迟对齐 Barrier 的一个 Stream 为处理 Buffer 中缓存记录的时刻点。在 Flink 中,提供了一个开关,选择是否使用 Stream Aligning, 如果关掉则 Exactly Once 会变成 At least once。
8.Flink Time
8.1Time 解读
Event Time 也就是事件发生的时间,事件的发生时间。这里举个例子,我们产生日志的时间,这个应该清楚的,日志的时间戳就是发生时间。 在 Flink 的流式处理中,绝大部分的业务都会使用 EventTime,一般只在 EventTime 无法使用时,才会被迫使用 ProcessingTime 或者 IngestionTime。

Ingestion Time 也就是提取时间,也就是进入 Flink 计算程序的时间,这个时候数据已经发送给窗口,也就是发送给窗口的时间,也就是程序处理计算的时间。
Processing Time 也就是处理时间,我们看到了这个已经进入 Flink 程序,也就是我们读取数据源时间,也就是日志到达 Flink 的时间,但是这个时间是本地机器的时间。

详细解释:
事件时间:事件时间是每个事件在其生产设备上发生的时间。 此时间通常在进入 Flink 之前嵌入记录中,并且可以从每个记录中提取该事件时间戳。在 事件时间,时间的进展取决于数据,而不是任何时钟。事件时间程序必须指定如何生成 事件时间水位线,这是表示事件时间进度的机制。在一个理想的情况下,事件时间处理 将产生完全一致和确定的结果,无论事件何时到达或其排序。但是,除非事件已知按顺 序到达(按时间戳),否则事件时间处理会在等待无序事件时产生一些延迟。由于只能等 待一段有限的时间,因此限制了确定性事件时间应用程序的可能性。假设所有数据都已 到达,事件时间操作将按预期运行,即使在处理无序或延迟事件或重新处理历史数据时 也会产生正确且一致的结果。例如,每小时事件时间窗口将包含带有落入该小时的事件 时间戳的所有记录,无论它们到达的顺序如何,或者何时处理它们。注意,有时当事件 时间程序实时处理实时数据时,它们将使用一些处理时间操作,以确保它们及时进行。
提取时间:提取时间是事件进入 Flink 的时间。 在源 operator 处,每个记录将源的当前时间作为时间戳,并且基于时间的操作(如时间 窗口)引用该时间戳。提取时间在概念上位于事件时间和处理时间之间。与处理时间相 比,它稍早一些,但可以提供更可预测的结果。因为提取时间使用稳定的时间戳(在源 处分配一次),所以对记录的不同窗口操作将引用相同的时间戳,而在处理时间中,每个 窗口 operate 可以将记录分配给不同的窗口(基于本地系统时钟和任何运输延误)。与事 件时间相比,提取时间程序无法处理任何无序事件或后期数据,但程序不必指定如何生 成水位线。在内部,提取时间与事件时间非常相似,但具有自动时间戳分配和自动水印 生成功能。
处理时间:处理时间是指执行相应操作的机器的系统时间。 当流程序在处理时间运行时,所有基于时间的操作(如时间窗口)将使用相应 operator 的机器的系统时钟。每小时处理时间窗口将包括在系统时钟指示整个小时之间到达特定 operator 的所有记录。例如,如果应用程序在上午 9:15 开始运行,则第一个每小时处理时间窗口将包括在上午 9:15 到上午 10:00 之间处理的事件,下一个窗口将包括在上午 10:00 到 11:00 之间处理的事件。处理时间是最简单的时间概念,不需要流和机器之间的协调。它提供最佳性能和最低延迟。但是,在分布式和异步环境中,处理时间不提供确 定性,因为它容易受到记录到达系统的速度(例如从消息队列)到记录在系统内的 operator 之间流动的速度的影响,和停电(调度或其他)。
8.2Watermark
流处理从事件产生是先流经 source,再到 operator,中间是有一个过程和时间的, 虽然大部分情况下,流到 operator 的数据都是按照事件产生的时间顺序来的,但是也不排除由于网络等原因,导致乱序的产生,所谓乱序,就是指 Flink 接收到的事件的先后顺序不是严格按照事件的 EventTime 顺序排列的。
那么此时出现一个问题,一旦出现乱序,如果只根据 EventTime 决定 window 的运行,我们 不能明确数据是否全部到位,但又不能无限期的等下去,此时必须要有个机制来保证一个 特定的时间后,必须触发 window 去进行计算了,这个特别的机制,就是 Watermark。 Flink Watermark 不容易理解,而且和 window 混合,就容易造成混乱了,Watermark 直接翻 译为“水位线”。首先白话 Watermark 的作用,首先我们都知道流式数据,但是流并不是那么顺畅的,比如网络卡等原因,造成数据流乱序,这时候该如何解决,于是产生了 Watermarks。
Watermark 是一种衡量 EventTime 进展的机制,它是数据本身的一个隐藏属性,数据本身携带着对应的 Watermark。
Watermark 是用于处理乱序事件的,而正确的处理乱序事件,通常用 Watermark 机制结合 window 来实现。
Watermark 本质上是一个带有时间戳的特殊 event,当 Flink 中的运算符 Operator 接收到 Watermark 时,它明白(假设)它不会看到比该时间戳更早的消息。数据流中的 Watermark用于表示 timestamp 小于 Watermark 的数据,都已经到达了,因此,window 的执行也是由 Watermark 触发的。
Watermark 可以理解成一个延迟触发机制,我们可以设置 Watermark 的延时时长 t,每次系统会已经到达的数据中最大的 maxEventTime,然后认定 eventTime 小于 maxEventTime - t 的所有数据都已经到达,如果有窗口的停止时间等于 maxEventTime - t,那么这个窗口被触发执行。
下显示了带有(逻辑)时间戳和内联水印的事件流。在本例中,事件是按顺序排列的(相对于它们的时间戳),这意味着水印只是流中的周期性标记。

Watermark 对于无序流是至关重要的,如下所示,其中事件不按时间戳排序。通常,Watermark 是一种声明,通过流中的该点,到达某个时间戳的所有事件都应该到达。一旦 Watermark 到达 operator,operator 就可以将其内部事件时间提前到 Watermark 的值。

针对进入窗口的每条数据,计算当前所有达到窗口的数据的最大 eventTime,将这个 eventTime 和延迟时间(Watermark)做减法,差值如果大于某一个窗口的的结束时间,那么该窗口就进行算子操作
有序流的 Watermarker 如下所示:(Watermark 设置为 0)

乱序流的 Watermarker 如下所示:(Watermark 设置为 2)

当 Flink 接收到每一条数据时,都会产生一条 Watermark,这条 Watermark 就等于当前所有到达数据中的 maxEventTime - 延迟时长,也就是说,Watermark 是由数据携带的,一旦数据携带的 Watermark 比当前未触发的窗口的停止时间要晚,那么就会触发相应窗口的执行。 由于 Watermark 是由数据携带的,因此,如果运行过程中无法获取新的数据,那么没有被触发的窗口将永远都不被触发。
上中,设置的允许最大延迟到达时间为 2s,所以时间戳为 7s 的事件对应的 Watermark 是5s,时间戳为12s的事件的Watermark是10s,如果我们的窗口1是1s5s,窗口2是6s10s, 那么时间戳为 7s 的事件到达时的 Watermarker 恰好触发窗口 1,时间戳为 12s 的事件到达时 的 Watermark 恰好触发窗口 2。
8.3并发 Watermark Watermark
是在源函数处生成的,或直接在源函数之后生成的。源函数的每个并行子任务通常独立生成其 Watermark。这些 Watermark 定义了特定并行源处的事件时间。当 Watermark 通过流程序时,它们会提前到达 Operator 的事件时间。当一个 Operator 提前它的事件时间时,它为它的后续操作符在下游生成一个新的 Watermark。一些 Operator 消耗多个输入流; 例如,一个 union,或者跟随 keyBy(…)或 partition(…)函数的 Operator。这样的 Operator 当前事件时间是其输入流的事件时间的最小值。由于其输入流更新其事件时间,因此 Operator 也是如此。

Flink使用 Watermark 的四个认识
在下面的示例中,我们有一个带时间戳的事件流,这些事件在某种程度上不按顺序到达。 显示的数字是指示实际发生这些事件的时间戳。到达的第一个事件发生在时间 4,然后是之前发生的事件,时间 2,依此类推:

注意,这是事件时间(Event Time)处理的示例,这意味着时间戳反映事件发生的时间,而不是事件被处理的时间。事件时处理是一个抽象,它使得创建流式应用程序成为可能,无论它们是在处理实时数据还是在重新处理历史数据,这些应用程序的行为都是一致的。
现在假设我们正在尝试创建一个流分类器。这意味着应用程序在流到达时处理每个事件,并 emit 包含相同事件但按其时间戳排序的新流。
我们的流分类器看到的第一个元素是 4,但我们不能立即将它作为排序流的第一个元素释放。 它可能已经故障,而早期的事件可能还会到来。 实际上,我们可以从这个流的未来中获得一些内容,我们可以看到我们的流分类器应该至少等到 2 到达之后再产生任何结果。有些缓冲,有些延迟是必要的。
如果出错了(故障),可能会永远等待。 首先,应用程序从第 4 个时间开始看到一个事件,然后从第 2 个时间开始看到一个事件。从早于 2 的时间开始的事件是否会到达?答案是不确定的。我们可以永远等待,永远不会看到 1。 最终,我们必须做决定,并将 2 作为排序流的开始。
我们需要的是某种策略,它定义了对于任何给定的时间戳事件何时停止等待早期事件的到来。这正是 Watermarks 的作用 - 它们定义何时停止等待早期事件。Flink 中的事件时间处理取决于特殊的带时间戳的元素,称为 watermarks,由数据源或 watermarks 生成器插入到流中。 具有时间戳 t 的 watermarks 可以被理解为断言(assertion )所有具有时间戳<t 的事件已经(具有合理的概率)已经到达。什么时候我们的流分类器应该停止等待,
我们可以设想不同的策略来决定如何生成 watermarks。 我们知道每个事件都会在延迟一段时间后到达并且这些延迟会有所不同,因此有些事件会比其他事件延迟更多。一种简单的方法是假设这些延迟受到一些最大延迟的限制。Flink 将此策略称为有界无序 watermarks。很容易想象出更复杂的 watermarks 方法,但对于许多应用来说,固定延迟效果还不错。如果要构建像流分类器这样的应用程序,Flink 的 ProcessFunction 是正确的构建块。它提供对事件时间(event-time)计时器的访问(即, 基于 watermarks 到达而触发的回调),并具有用于管理缓冲事件所需状态的挂钩,直到轮到它们被发送到下游。
9.Flink 内部原理
9.1容错机制
Flink 基于 Checkpoint 机制实现容错,它的原理是不断地生成分布式 Streaming 数据流 Snapshot。在流处理失败时,通过这些 Snapshot 可以恢复数据流处理。理解 Flink 的容错机 制,首先需要了解一下 Barrier 这个概念:
Stream Barrier 是 Flink 分布式 Snapshotting 中的核心元素,它会作为数据流的记录被同等看待,被插入到数据流中,将数据流中记录的进行分组,并沿着数据流的方向向前推进。每个 Barrier 会携带一个 Snapshot ID,属于该 Snapshot 的记录会被推向该 Barrier 的前方。因为 Barrier 非常轻量,所以并不会中断数据流。带有 Barrier 的数据流,如下所示:

说明:
出现一个 Barrier,在该 Barrier 之前出现的记录都属于该 Barrier 对应的 Snapshot,在该 Barrier 之后出现的记录属于下一个 Snapshot
来自不同 Snapshot 多个 Barrier 可能同时出现在数据流中,也就是说同一个时刻可能并发生成多个 Snapshot
当一个中间(Intermediate)Operator 接收到一个 Barrier 后,它会发送 Barrier 到属于该 Barrier 的 Snapshot 的数据流中,等到 Sink Operator 接收到该 Barrier 后会向 Checkpoint Coordinator 确认该 Snapshot,直到所有的 Sink Operator 都确认了该 Snapshot,才被认为完 成了该 Snapshot
这里还需要强调的是,Snapshot 并不仅仅是对数据流做了一个状态的 Checkpoint,它也包含了一个 Operator 内部所持有的状态,这样才能够在保证在流处理系统失败时能够正确地恢复数据流处理。也就是说,如果一个 Operator 包含任何形式的状态,这种状态必须是 Snapshot 的一部分。
Operator 的状态包含两种:一种是系统状态,一个 Operator 进行计算处理的时候需要对数据进行缓冲,所以数据缓冲区的状态是与 Operator 相关联的,以窗口操作的缓冲区为例, Flink 系统会收集或聚合记录数据并放到缓冲区中,直到该缓冲区中的数据被处理完成;另一种是用户自定义状态(状态可以通过转换函数进行创建和修改),它可以是函数中的 Java 对象这样的简单变量,也可以是与函数相关的 Key/Value 状态。
对于具有轻微状态的 Streaming 应用,会生成非常轻量的 Snapshot 而且非常频繁,但并不会 影响数据流处理性能。Streaming应用的状态会被存储到一个可配置的存储系统中,例如HDFS。 在一个 Checkpoint 执行过程中,存储的状态信息及其交互过程,如下所示:

在 Checkpoint 过程中,还有一个比较重要的操作–Stream Aligning。当 Operator 接收到多个 输入的数据流时,需要在 Snapshot Barrier 中对数据流进行排列对齐,如下所示:

具体排列过程如下:
Operator 从一个 incoming Stream 接收到 Snapshot Barrier n,然后暂停处理,直到其它的 incoming Stream 的 Barrier n(否则属于 2 个 Snapshot 的记录就混在一起了)到达该 Operator
接收到 Barrier n 的 Stream 被临时搁置,来自这些 Stream 的记录不会被处理,而是被放在一个 Buffer 中
一旦最后一个 Stream 接收到 Barrier n,Operator 会 emit 所有暂存在 Buffer 中的记录,然 后向 Checkpoint Coordinator 发送 Snapshot n
继续处理来自多个 Stream 的记录
基于 Stream Aligning 操作能够实现 Exactly Once 语义,但是也会给流处理应用带来延迟,因为为了排列对齐 Barrier,会暂时缓存一部分 Stream 的记录到 Buffer 中,尤其是在数据流并行度很高的场景下可能更加明显,通常以最迟对齐 Barrier 的一个 Stream 为处理 Buffer 中缓存记录的时刻点。在 Flink 中,提供了一个开关,选择是否使用 Stream Aligning,如果关掉则 Exactly Once 会变成 At least once。
9.2调度机制
在 JobManager 端,会接收到 Client 提交的 JobGraph 形式的 Flink Job,JobManager 会将一个 JobGraph 转换映射为一个 ExecutionGraph,如下所示:

通过上可以看出:
JobGraph 是一个 Job 的用户逻辑视表示,将一个用户要对数据流进行的处理表示为单个 DAG (对应于 JobGraph),DAG 由顶点(JobVertex)和中间结果集(IntermediateDataSet)组成,其中 JobVertex 表示了对数据流进行的转换操作,比如 map、flatMap、filter、keyBy 等操作,而 IntermediateDataSet 是由上游的 JobVertex 所生成,同时作为下游的 JobVertex 的输入。
而 ExecutionGraph 是 JobGraph 的并行表示,也就是实际 JobManager 调度一个 Job 在 TaskManager 上运行的逻辑视,它也是一个 DAG ,是由 ExecutionJobVertex、 IntermediateResult(或 IntermediateResultPartition)组成,ExecutionJobVertex 实际对应于 JobGraph 中的 JobVertex,只不过在 ExecutionJobVertex 内部是一种并行表示,由多个并行 的 ExecutionVertex 所组成。另外,这里还有一个重要的概念,就是 Execution,它是一个 ExecutionVertex 的一次运行 Attempt,也就是说,一个 ExecutionVertex 可能对应多个运行状态的 Execution,比如,一个 ExecutionVertex 运行产生了一个失败的 Execution,然后还会创建一个新的 Execution 来运行,这时就对应这个 2 次运行 Attempt。每个 Execution 通过 ExecutionAttemptID 来唯一标识,在 TaskManager 和 JobManager 之间进行 Task 状态的交换 都是通过 ExecutionAttemptID 来实现的。
下面看一下,在物理上进行调度,基于资源的分配与使用的一个例子,来自官网,如下所示:

说明如下:
左上子:有 2 个 TaskManager,每个 TaskManager 有 3 个 Task Slot。一个 Flink Job,逻辑上包含了 1 个 data source、1 个 MapFunction、1 个 ReduceFunction,对应一个 JobGraph
左下子:用户提交的 Flink Job 对各个 Operator 进行的配置——data source 的并行度设 置为 4,MapFunction 的并行度也为4,ReduceFunction 的并行度为 3,在 JobManager 端对 应于 ExecutionGraph
右上子:TaskManager 1 上,有 2 个并行的 ExecutionVertex 组成的 DAG ,它们各占 用一个 Task Slot
右下子:TaskManager 2 上,也有 2 个并行的 ExecutionVertex 组成的 DAG ,它们也 各占用一个 Task Slot
在 2 个 TaskManager 上运行的 4 个 Execution 是并行执行的
9.3迭代机制
机器学习和计算应用,都会使用到迭代计算,Flink 通过在迭代 Operator 中定义 Step 函数来实现迭代算法,这种迭代算法包括 Iterate 和 Delta Iterate 两种类型,在实现上它们反复地在当前迭代状态上调用 Step 函数,直到满足给定的条件才会停止迭代。下面,对 Iterate 和 Delta Iterate 两种类型的迭代算法原理进行说明:Iterate
Iterate Operator 是一种简单的迭代形式:每一轮迭代,Step 函数的输入或者是输入的整个数据集,或者是上一轮迭代的结果,通过该轮迭代计算出下一轮计算所需要的输入(也称为 Next Partial Solution),满足迭代的终止条件后,会输出最终迭代结果,具体执行流程如下 所示:

Step 函数在每一轮迭代中都会被执行,它可以是由 map、reduce、join 等 Operator 组成的数据流。下面通过官网给出的一个例子来说明 Iterate Operator,非常简单直观,如下所示:

上面迭代过程中,输入数据为 1 到 5 的数字,Step 函数就是一个简单的 map 函数,会对每个输入的数字进行加 1 处理,而 Next Partial Solution 对应于经过 map 函数处理后的结果, 比如第一轮迭代,对输入的数字 1 加 1 后结果为 2,对输入的数字 2 加 1 后结果为 3,直到 对输入数字 5 加 1 后结果为变为 6,这些新生成结果数字 2~6 会作为第二轮迭代的输入。迭代终止条件为进行 10 轮迭代,则最终的结果为 11~15。
Delta Iterate Operator 实现了增量迭代,它的实现原理如下所示:

基于 Delta Iterate Operator 实现增量迭代,它有 2 个输入,其中一个是初始 Workset,表示输入待处理的增量 Stream 数据,另一个是初始 Solution Set,它是经过 Stream 方向上 Operator 处理过的结果。第一轮迭代会将 Step 函数作用在初始 Workset 上,得到的计算结果 Workset 作为下一轮迭代的输入,同时还要增量更新初始 Solution Set。如果反复迭代知道满足迭代 终止条件,最后会根据 Solution Set 的结果,输出最终迭代结果。
比如,我们现在已知一个 Solution 集合中保存的是,已有的商品分类大类中购买量最多的商 品,而 Workset 输入的是来自线上实时交易中最新达成购买的商品的人数,经过计算会生成新的商品分类大类中商品购买量最多的结果,如果某些大类中商品购买量突然增长,它需要更新 Solution Set 中的结果(原来购买量最多的商品,经过增量迭代计算,可能已经不是最多),最后会输出最终商品分类大类中购买量最多的商品结果集合。
9.4BackPressure 监控
Backpressure 在流式计算系统中会比较受到,因为在一个 Stream 上进行处理的多个 Operator 之间,它们处理速度和方式可能非常不同,所以就存在上游 Operator 如果处理速度过快,下游 Operator 处可能机会堆积 Stream 记录,严重会造成处理延迟或下游 Operator 负载过重而崩溃(有些系统可能会丢失数据)。因此,对下游 Operator 处理速度跟不上的情 况,如果下游 Operator 能够将自己处理状态传播给上游 Operator,使得上游 Operator 处理速度慢下来就会缓解上述问题,比如通过告警的方式通知现有流处理系统存在的问题。 Flink Web 界面上提供了对运行 Job 的 Backpressure 行为的监控,它通过使用 Sampling 线程对正在运行的 Task 进行堆栈跟踪采样来实现,具体实现方式如下所示:

JobManager 会反复调用一个 Job 的 Task 运行所在线程的 Thread.getStackTrace(),默认情况下, JobManager 会每间隔 50ms 触发对一个 Job 的每个 Task 依次进行 100 次堆栈跟踪调用,根据调用调用结果来确定 Backpressure,Flink 是通过计算得到一个比值(Radio)来确定当前运行 Job 的 Backpressure 状态。在 Web 界面上可以看到这个 Radio 值,它表示在一个内部方法调用中阻塞(Stuck)的堆栈跟踪次数,例如,radio=0.01,表示 100 次中仅有 1 次方法调用阻塞。Flink 目前定义了如下 Backpressure 状态:
OK: 0 <= Ratio <= 0.10
LOW: 0.10 < Ratio <= 0.5
HIGH: 0.5 < Ratio <= 1
另外,Flink 还提供了 3 个参数来配置 Backpressure 监控行为:
参数名称 | 默认值 | 说明 |
jobmanager.web.backpressure.refresh-interval | 60000 | 默认 1 分钟,表示采样统 计结果刷新时间间隔 |
jobmanager.web.backpressure.num-samples | 100 | 评估 Backpressure 状态, 所使用的堆栈跟踪调用次 数 |
jobmanager.web.backpressure.delay-between-samples | 50 | 默认 50 毫秒,表示对一个 Job 的每个 Task 依次调用 的时间间隔 |
通过上面个定义的 Backpressure 状态,以及调整相应的参数,可以确定当前运行的 Job 的状态是否正常,并且保证不影响 JobManager 提供服务
Flink内核原理全解析,大数据时代,且看Flink如何叱咤风云
前言
Flink项目是大数据计算领域冉冉升起的一颗新星。大数据计算引擎的发展经历了几个过程,从第1代的MapReduce,到第2代基于有向无环的Tez,第3代基于内存计算的Spark,再到第4代的Flink。因为Flink可以基于Hadoop进行开发和使用,所以Flink并不会取代Hadoop,而是和Hadoop紧密结合。
Flink主要包括DataStream API、DataSet API、Table API、SQL、Graph API和FlinkML等。现在Flink也有自己的生态圈,涉及离线数据处理、实时数据处理、SQL操作、计算和机器学习库等。

Flink生态圈
Flink原理分析
很多人是在2015年才听到Flink这个词的,其实早在2008年,Flink的前身就已经是柏林理工大学的一个研究性项目,在2014年这个项目被Apache孵化器所接受后,Flink迅速成为ASF(Apache Software Foundation)的顶级项目之一。
Flink是一个开源的流处理框架,它具有以下特点。
分布式:Flink程序可以运行在多台机器上。
高性能:处理性能比较高。
高可用:由于Flink程序本身是稳定的,因此它支持高可用性(High Availability,HA)。
准确:Flink可以保证数据处理的准确性。
Flink主要由Java代码实现,它同时支持实时流处理和批处理。对于Flink而言,作为一个流处理框架,批数据只是流数据的一个极限特例而已。此外,Flink还支持迭代计算、内存管理和程序优化,这是它的原生特性。
Flink特性
支持批处理和数据流程序处理
优雅流畅的支持java和scala api
同时支持高吞吐量和低延迟
支持事件处理和无序处理通过SataStream API,基于DataFlow数据流模型
在不同的时间语义(时间时间,处理时间)下支持灵活的窗口(时间,技术,会话,自定义触发器)
仅处理一次的容错担保
自动反压机制
处理(批) 机器学习(批) 复杂事件处理(流)
在dataSet(批处理)API中内置支持迭代程序(BSP)
高效的自定义内存管理,和健壮的切换能力在in-memory和out-of-core中
兼容hadoop的mapreduce和storm
集成YARN,HDFS,Hbase 和其它hadoop生态系统的组件

Flink的功能特性
在这里解释一下,高吞吐表示单位时间内可以处理的数据量很大,低延迟表示数据产生以后可以在很短的时间内对其进行处理,也就是Flink可以支持快速地处理海量数据。
Flink架构分析
Flink架构可以分为4层,包括Deploy层、Core层、API层和Library层
Deploy层:该层主要涉及Flink的部署模式,Flink支持多种部署模式——本地、集群(Standalone/YARN)和云服务器(GCE/EC2)。
Core层:该层提供了支持Flink计算的全部核心实现,为API层提供基础服务。
API层:该层主要实现了面向无界Stream的流处理和面向Batch的批处理API,其中流处理对应DataStream API,批处理对应DataSet API。
Library层:该层也被称为Flink应用框架层,根据API层的划分,在API层之上构建的满足特定应用的实现计算框架,也分别对应于面向流处理和面向批处理两类。面向流处理支持CEP(复杂事件处理)、基于SQL-like的操作(基于Table的关系操作);面向批处理支持FlinkML(机器学习库)、Gelly(处理)、Table 操作。
Flink对底层的一些操作进行了封装,为用户提供了DataStream API和DataSet API。使用这些API可以很方便地完成一些流数据处理任务和批数据处理任务。

Flink架构
Flink基本组件
读者应该对Hadoop和Storm程序有所了解,在Hadoop中实现一个MapReduce需要两个阶段——Map和Reduce,而在Storm中实现一个Topology则需要Spout和Bolt组件。因此,如果我们想实现一个Flink任务的话,也需要有类似的逻辑。
Flink中提供了3个组件,包括DataSource、Transformation和DataSink。
DataSource:表示数据源组件,主要用来接收数据,目前官网提供了readTextFile、socketTextStream、fromCollection以及一些第三方的Source。
Transformation:表示算子,主要用来对数据进行处理,比如Map、FlatMap、Filter、Reduce、Aggregation等。
DataSink:表示输出组件,主要用来把计算的结果输出到其他存储介质中,比如writeAsText以及Kafka、Redis、Elasticsearch等第三方Sink组件。
因此,想要组装一个Flink Job,至少需要这3个组件。
Flink Job=DataSource+Transformation+DataSink
Flink流处理(Streaming)与批处理(Batch)
在大数据处理领域,批处理与流处理一般被认为是两种截然不同的任务,一个大数据框架一般会被设计为只能处理其中一种任务。比如,Storm只支持流处理任务,而MapReduce、Spark只支持批处理任务。Spark Streaming是Apache Spark之上支持流处理任务的子系统,这看似是一个特例,其实不然——Spark Streaming采用了一种Micro-Batch架构,即把输入的数据流切分成细粒度的Batch,并为每一个Batch数据提交一个批处理的Spark任务,所以Spark Streaming本质上还是基于Spark批处理系统对流式数据进行处理,和Storm等完全流式的数据处理方式完全不同。
通过灵活的执行引擎,Flink能够同时支持批处理任务与流处理任务。在执行引擎层级,流处理系统与批处理系统最大的不同在于节点间的数据传输方式。
对于一个流处理系统,其节点间数据传输的标准模型是,在处理完成一条数据后,将其序列化到缓存中,并立刻通过网络传输到下一个节点,由下一个节点继续处理。而对于一个批处理系统,其节点间数据传输的标准模型是,在处理完成一条数据后,将其序列化到缓存中,当缓存写满时,就持久化到本地硬盘上;在所有数据都被处理完成后,才开始将其通过网络传输到下一个节点。

Flink的3种数据传输模型
这两种数据传输模式是两个极端,对应的是流处理系统对低延迟和批处理系统对高吞吐的要求。Flink的执行引擎采用了一种十分灵活的方式,同时支持了这两种数据传输模型。
Flink以固定的缓存块为单位进行网络数据传输,用户可以通过设置缓存块超时值指定缓存块的传输时机。如果缓存块的超时值为0,则Flink的数据传输方式类似于前面所提到的流处理系统的标准模型,此时系统可以获得最低的处理延迟;如果缓存块的超时值为无限大,则Flink的数据传输方式类似于前面所提到的批处理系统的标准模型,此时系统可以获得最高的吞吐量。
缓存块的超时值也可以设置为0到无限大之间的任意值,缓存块的超时阈值越小,Flink流处理执行引擎的数据处理延迟就越低,但吞吐量也会降低,反之亦然。通过调整缓存块的超时阈值,用户可根据需求灵活地权衡系统延迟和吞吐量。
Flink典型应用场景分析
Flink主要应用于流式数据分析场景,目前涉及如下领域。
实时ETL:集成流计算现有的诸多数据通道和SQL灵活的加工能力,对流式数据进行实时清洗、归并和结构化处理;同时,对离线数据进行有效的补充和优化,并为数据实时传输提供可计算通道。
实时报表:实时化采集、加工流式数据存储;实时监控和展现业务、客户各类指标,让数据化运营实时化。
监控预警:对系统和用户行为进行实时检测和分析,以便及时发现危险行为。
在线系统:实时计算各类数据指标,并利用实时结果及时调整在线系统的相关策略,在各类内容投放、无线智能推送领域有大量的应用。
Flink在如下类型的公司中有具体的应用。
优化电商网站的实时搜索结果:阿里巴巴的基础设施团队使用Flink实时更新产品细节和库存信息(Blink)。
针对数据分析团队提供实时流处理服务:通过Flink数据分析平台提供实时数据分析服务,及时发现问题。
网络/传感器检测和错误检测:Bouygues电信公司是法国著名的电信供应商,使用Flink监控其有线和无线网络,实现快速故障响应。
商业智能分析ETL:Zalando使用Flink转换数据以便于将其加载到数据仓库,简化复杂的转换操作,并确保分析终端用户可以更快地访问数据(实时ETL)。
流式计算框架对比
Storm是比较早的流式计算框架,后来又出现了Spark Streaming和Trident,现在又出现了Flink这种优秀的实时计算框架,那么这几种计算框架到底有什么区别呢?下面我们来详细分析一下

流式计算框架对比
产品模型API保证次数容错机制状态管理延时吞吐量StormNative(数据进入立即处理)组合式(基础API)At-least-once (至少一次)Record ACK(ACK机制)无低低TridentMicro-Batching(划分为小批 处理)组合式Exactly-once (仅一次)Record ACK基于操作(每次操作有一个状态)中等中等Spark StreamingMicro-Batching声明式(提供封装后的高阶函数,如count函数)Exactly-onceRDD CheckPoint(基于RDD做CheckPoint)基于DStream中等高FlinkNative声明式Exactly-onceCheckPoint(Flink的一种快照)基于操作低高
在这里对这几种框架进行对比。
模型:Storm和Flink是真正的一条一条处理数据;而Trident(Storm的封装框架)和Spark Streaming其实都是小批处理,一次处理一批数据(小批量)。
API:Storm和Trident都使用基础API进行开发,比如实现一个简单的sum求和操作;而Spark Streaming和Flink中都提供封装后的高阶函数,可以直接拿来使用,这样就比较方便了。
保证次数:在数据处理方面,Storm可以实现至少处理一次,但不能保证仅处理一次,这样就会导致数据重复处理问题,所以针对计数类的需求,可能会产生一些误差;Trident通过事务可以保证对数据实现仅一次的处理,Spark Streaming和Flink也是如此。
容错机制:Storm和Trident可以通过ACK机制实现数据的容错机制,而Spark Streaming和Flink可以通过CheckPoint机制实现容错机制。
状态管理:Storm中没有实现状态管理,Spark Streaming实现了基于DStream的状态管理,而Trident和Flink实现了基于操作的状态管理。
延时:表示数据处理的延时情况,因此Storm和Flink接收到一条数据就处理一条数据,其数据处理的延时性是很低的;而Trident和Spark Streaming都是小型批处理,它们数据处理的延时性相对会偏高。
吞吐量:Storm的吞吐量其实也不低,只是相对于其他几个框架而言较低;Trident属于中等;而Spark Streaming和Flink的吞吐量是比较高的。
官网中Flink和Storm的吞吐量对如

Flink和Storm的吞吐量对比
工作中如何选择实时计算框架
前面我们分析了3种实时计算框架,那么公司在实际操作时到底选择哪种技术框架呢?下面我们来分析一下。
需要流数据是否需要进行状态管理,如果是,那么只能在Trident、Spark Streaming和Flink中选择一个。
需要考虑项目对At-least-once(至少一次)或者Exactly-once(仅一次)消息投递模式是否有特殊要求,如果必须要保证仅一次,也不能选择Storm。
对于小型独立的项目,并且需要低延迟的场景,建议使用Storm,这样比较简单。
如果你的项目已经使用了Spark,并且秒级别的实时处理可以满足需求的话,建议使用Spark Streaming
要求消息投递语义为Exactly-once;数据量较大,要求高吞吐低延迟;需要进行状态管理或窗口统计,这时建议使用Flink。
喜欢小编请多多点赞评论转发,你们的支持就是小编最大的动力!!!小编,后续小编会继续努力,为大家带来更多的学习内容分享~~~

Flink CDC 原理、实践和优化
作者:kyledong:腾讯云大数据
出处:https://mp.weixin.qq.com/s?__biz=MzUzNTc0NTcyMw==&mid=2247485657&idx=1&sn=f6fa360a7c35cb6f03e78222ca8691cb
CDC 变更数据捕获技术 可以将 源数据库的增量变动记录,同步到一个或多个数据目的。本文基于 腾讯云 O ceanus 提供的 Flink CDC 引擎, 着重介绍 Flink 在变更数据捕获技术中的应用。
一、CDC 是什么?
CDC 是变更数据捕获(Change Data Capture)技术的缩写,它可以将源数据库(Source)的增量变动记录,同步到一个或多个数据目的(Sink)。在同步过程中,还可以对数据进行一定的处理,例如分组(GROUP BY)、多表的关联(JOIN)等。
例如对于电商平台,用户的订单会实时写入到某个源数据库;A 部门需要将每分钟的实时数据简单聚合处理后保存到 Redis 中以供查询,B 部门需要将当天的数据暂存到 Elasticsearch 一份来做报表展示,C 部门也需要一份数据到 ClickHouse 做实时数仓。随着时间的推移,后续 D 部门、E 部门也会有数据分析的需求,这种场景下,传统的拷贝分发多个副本方法很不灵活,而 CDC 可以实现一份变动记录,实时处理并投递到多个目的地。
下是一个示例,通过腾讯云 Oceanus 提供的 Flink CDC 引擎,可以将某个 MySQL 的数据库表的变动记录,实时同步到下游的 Redis、Elasticsearch、ClickHouse 等多个接收端。这样大家可以各自分析自己的数据集,互不影响,同时又和上游数据保持实时的同步。

二、CDC 的实现原理
通常来讲,CDC 分为 主动查询 和 事件接收 两种技术实现模式。
对于主动查询而言,用户通常会在数据源表的某个字段中,保存上次更新的时间戳或版本号等信息,然后下游通过不断的查询和与上次的记录做对比,来确定数据是否有变动,是否需要同步。这种方式优点是不涉及数据库底层特性,实现比较通用;缺点是要对业务表做改造,且实时性不高,不能确保跟踪到所有的变更记录,且持续的频繁查询对数据库的压力较大。
事件接收模式可以通过触发器(Trigger)或者日志(例如 Transaction log、Binary log、Write-ahead log 等)来实现。当数据源表发生变动时,会通过附加在表上的触发器或者 binlog 等途径,将操作记录下来。下游可以通过数据库底层的协议,订阅并消费这些事件,然后对数据库变动记录做重放,从而实现同步。这种方式的优点是实时性高,可以精确捕捉上游的各种变动;缺点是部署数据库的事件接收和解析器(例如 Debezium、Canal 等),有一定的学习和运维成本,对一些冷门的数据库支持不够。
综合来看,事件接收模式整体在实时性、吞吐量方面占优,如果数据源是 MySQL、PostgreSQL、MongoDB 等常见的数据库实现,建议使用 Debezium ( https://debezium.io/documentation/reference/1.4/connectors/index.html ) 来实现变更数据的捕获(下来自 Debezium 官方文档 [https://debezium.io/documentation/reference/architecture.html] )。如果使用的只有 MySQL,则还可以用 Canal ( https://github.com/alibaba/canal) 。

三、为什么选 Flink?
从上可以看到,Debezium 官方架构中,是通过 Kafka Streams 直接实现的 CDC 功能。而我们这里更建议使用 Flink CDC 模块,因为 Flink 相对 Kafka Streams 而言,有如下优势:
Flink 的算子和 SQL 模块更为成熟和易用
Flink 作业可以通过调整算子并行度的方式,轻松扩展处理能力
Flink 支持高级的状态后端(State Backends),允许存取海量的状态数据
Flink 提供更多的 Source 和 Sink 等生态支持
Flink 有更大的用户基数和活跃的支持社群,问题更容易解决
Flink 的开源协议允许云厂商进行全托管的深度定制,而 Kafka Streams 只能自行部署和运维
而且 Flink Table / SQL 模块将数据库表和变动记录流(例如 CDC 的数据流)看做是 同一事物的两面 ( https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/streaming/dynamic_tables.html) ,因此内部提供的 Upsert 消息结构( +I 表示新增、 -U 表示记录更新前的值、 +U 表示记录更新后的值, -D 表示删除)可以与 Debezium 等生成的变动记录一一对应。
四、Flink CDC 的使用方法
目前 Flink CDC 支持两种数据源输入方式。
(一)输入 Debezium 等数据流进行同步
例如 MySQL -> Debezium -> Kafka -> Flink -> PostgreSQL。适用于已经部署好了 Debezium,希望暂存一部分数据到 Kafka 中以供多次消费,只需要 Flink 解析并分发到下游的场景。

在该场景下,由于 CDC 变更记录会暂存到 Kafka 一段时间,因此可以在这期间任意启动/重启 Flink 作业进行消费;也可以部署多个 Flink 作业对这些数据同时处理并写到不同的数据目的(Sink)库表中,实现了 Source 变动与 Sink 的解耦。
用法示例
例如我们有个 MySQL 数据库,需要实时将内容同步到 PostgreSQL 中。假设已经安装部署好 Debezium 并开始消费 PostgreSQL 的变更日志,这些日志在持续写入名为 YourDebeziumTopic 的 Kafka 主题中。
我们可以新建一个 Flink SQL 作业,然后输入如下 SQL 代码(连接参数都是虚拟的,仅供参考):
CREATE TABLE `Data_Input` ( id BIGINT, actor VARCHAR, alias VARCHAR, PRIMARY KEY (`id`) NOT ENFORCED) WITH ( 'connector' = 'kafka', -- 可选 'kafka','kafka-0.11'. 注意选择对应的内置 Connector 'topic' = 'YourDebeziumTopic', -- 替换为您要消费的 Topic 'scan.startup.mode' = 'earliest-offset' -- 可以是 latest-offset / earliest-offset / specific-offsets / group-offsets 的任何一种 'properties.bootstrap.servers' = '10.0.1.2:9092', -- 替换为您的 Kafka 连接地址 'properties.group.id' = 'YourGroup', -- 必选参数, 一定要指定 Group ID -- 定义数据格式 (Debezium JSON 格式) 'format' = 'debezium-json', 'debezium-json.schema-include' = 'false',); CREATE TABLE `Data_Output` ( id BIGINT, actor VARCHAR, alias VARCHAR, PRIMARY KEY (`id`) NOT ENFORCED) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:postgresql://postgresql.example:50060/myDatabase?currentSchema=mySchema&reWriteBatchedInserts=true', -- 请替换为您的实际 PostgreSQL 连接参数 'table-name' = 'MyTable', -- 需要写入的数据表 'username' = 'user', -- 数据库访问的用户名(需要提供 INSERT 权限) 'password' = 'helloworld' -- 数据库访问的密码); INSERT INTO `Data_Output` SELECT * FROM `Data_Input`;
如果在流计算 Oceanus 界面上,可以勾选 kafka 和 jdbc 两个内置的 Connector:

随后直接开始运行作业,Flink 就会源源不断的消费 YourDebeziumTopic 这个 Kafka 主题中 Debezium 写入的记录,然后输出到下游的 MySQL 数据库中,实现了数据同步。
(二)直接对接上游数据库进行同步
我们还可以跳过 Debezium 和 Kafka 的中转,使用 Flink CDC Connectors ( https://github.com/ververica/flink-cdc-connectors ) 对上游数据源的变动进行直接的订阅处理。从内部实现上讲,Flink CDC Connectors 内置了一套 Debezium 和 Kafka 组件,但这个细节对用户屏蔽,因此用户看到的数据链路如下所示:

用法示例
同样的,这次我们有个 MySQL 数据库,需要实时将内容同步到 PostgreSQL 中。但我们没有也不想安装 Debezium 等额外组件,那我们可以新建一个 Flink SQL 作业,然后输入如下 SQL 代码(连接参数都是虚拟的,仅供参考):
CREATE TABLE `Data_Input` ( id BIGINT, actor VARCHAR, alias VARCHAR, PRIMARY KEY (`id`) NOT ENFORCED) WITH ( 'connector' = 'mysql-cdc', -- 可选 'mysql-cdc' 和 'postgres-cdc' 'hostname' = '192.168.10.22', -- 数据库的 IP 'port' = '3306', -- 数据库的访问端口 'username' = 'debezium', -- 数据库访问的用户名(需要提供 SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT, SELECT, RELOAD 权限) 'password' = 'hello@world!', -- 数据库访问的密码 'database-name' = 'YourData', -- 需要同步的数据库 'table-name' = 'YourTable' -- 需要同步的数据表名);CREATE TABLE `Data_Output` ( id BIGINT, actor VARCHAR, alias VARCHAR, PRIMARY KEY (`id`) NOT ENFORCED) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:postgresql://postgresql.example:50060/myDatabase?currentSchema=mySchema&reWriteBatchedInserts=true', -- 请替换为您的实际 PostgreSQL 连接参数 'table-name' = 'MyTable', -- 需要写入的数据表 'username' = 'user', -- 数据库访问的用户名(需要提供 INSERT 权限) 'password' = 'helloworld' -- 数据库访问的密码);INSERT INTO `Data_Output` SELECT * FROM `Data_Input`;
如果在流计算页面,可以选择内置的 mysql-cdc 和 jdbc Connector:

注意
需要使用 Flink CDC Connectors ( https://github.com/ververica/flink-cdc-connectors) 附加组件。腾讯云 Oceanus 已经自带了 MySQL-CDC Connector,如果自行部署的话,需要 jar 包并将其放入 Flink 的 lib 目录下。
访问数据库时,请确保连接的用户足够权限(PostgreSQL 用户 看这里 [ https://debezium.io/documentation/reference/connectors/postgresql.html#postgresql-permissions] ,MySQL 用户 看这里 [https://debezium.io/documentation/reference/connectors/mysql.html#setting-up-mysql] )。
五、Flink CDC 模块的实现
(一)Debezium JSON 格式解析类探秘
flink-json 模块中的 org.apache.flink.formats.json.debezium.DebeziumJsonFormatFactory 是负责构造解析 Debezium JSON 格式的工厂类;同样地, org.apache.flink.formats.json.canal.CanalJsonFormatFactory 负责 Canal JSON 格式。这些类已经内置在 Flink 1.11 的发行版中,直接可以使用,无需附加任何程序包。
对于 Debezium JSON 格式而言,Flink 将具体的解析逻辑放在了 org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema#DebeziumJsonDeserializationSchema 类中。

上表示 Debezium JSON 的一条更新(Update)消息,它表示上游已将 id=123 的数据更新,且字段内包含了更新前的旧值,以及更新后的新值。
那么,Flink 是如何解析并生成对应的 Flink 消息呢?我们看下这个类的 deserialize 方法:
GenericRowData before = (GenericRowData) payload.getField(0); // 更新前的数据GenericRowData after = (GenericRowData) payload.getField(1); // 更新后的数据String op = payload.getField(2).toString(); // 获取 "op" 字段的类型if (OP_CREATE.equals(op) || OP_READ.equals(op)) { // 如果是创建 (c) 或快照读取 (r) 消息 after.setRowKind(RowKind.INSERT); // 设置消息类型为新建 (+I) out.collect(after); // 发送给下游} else if (OP_UPDATE.equals(op)) { // 如果是更新 (u) 消息 before.setRowKind(RowKind.UPDATE_BEFORE); // 把更新前的数据类型设置为撤回 (-U) after.setRowKind(RowKind.UPDATE_AFTER); // 把更新后的数据类型设置为更新 (+U) out.collect(before); // 发送两条数据给下游 out.collect(after);} else if (OP_DELETE.equals(op)) { // 如果是删除 (d) 消息 before.setRowKind(RowKind.DELETE); // 将消息类型设置为删除 (-D) out.collect(before); // 发送给下游} else { ... // 异常处理逻辑}
从上述逻辑可以看出,对于每一种 Debezium 的操作码( op 字段的类型),都可以用 Flink 的 RowKind 类型来表示。对于插入 +I 和删除 D ,都只需要一条消息即可;而对于更新,则涉及删除旧数据和写入新数据,因此需要 -U 和 +U 两条消息来对应。
特别地,在 MySQL、PostgreSQL 等支持 Upsert(原子操作的 Update or Insert)语义的数据库中,通常前一个 -U 消息可以省略,只把后一个 +U 消息用作实际的更新操作即可,这个优化在 Flink 中也有实现。
因此可以看到,Debezium 到 Flink 消息的转换逻辑是非常简单和自然的,这也多亏了 Flink 先进的设计理念,很早就提出并实现了 Upsert 数据流和动态数据表之间的映射关系。
1.Flink CDC Connectors 的实现
(1)flink-connector-debezium 模块
我们在使用 Flink CDC Connectors 时,也会好奇它究竟是如何做到的不需要安装和部署外部服务就可以实现 CDC 的。当我们阅读 flink-connector-mysql-cdc 的源码时,可以看到它内部依赖了 flink-connector-debezium 模块,而这个模块将 Debezium Embedded ( https://github.com/debezium/debezium/tree/master/debezium-embedded ) 嵌入到了 Connector 中。
flink-connector-debezium 的数据源实现类为 com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction ,它集成了 Flink 中的 RichSourceFunction 并实现了 CheckpointedFunction 以支持快照保存状态。
通常而言,对于 SourceFunction,我们可以从它的 run 方法入手分析。它的核心代码如下:
this.engine = DebeziumEngine.create(Connect.class) .using(properties) // 初始化 Debezium 所需的参数 .notifying(debeziumConsumer) // 收到批量的变更消息, 则 Debezium 会回调 DebeziumChangeConsumer 来反序列化并向下游输出数据 .using(OffsetCommitPolicy.always()) .using( (success, message, error) -> { if (!success && error != null) { this.reportError(error); } }) .build();... executor.execute(engine); // 向 Executor 提交 Debezium 线程以启动运行
可以看到,这个 SourceFunction 使用一些预先定义的参数,初始化了一个嵌入式的 DebeziumEngine(Java 的 Runnable ),然后提交给线程池(executor)去执行。这个 Debezium 线程会批量接收 binlog 信息并回调传入的 debeziumConsumer 以反序列化消息并交给 Flink 来处理。本类的其他方法主要负责初始化状态和保存快照,这里略过。
这里我们再来看一下 DebeziumChangeConsumer 的实现,它的最核心的方法是 handleBatch 。当 Debezium 收到一批新的事件时,会调用这个方法来通知我们的 Connector 进行处理。这里有个 for 循环轮询的逻辑:
for (ChangeEvent<SourceRecord, SourceRecord> event : changeEvents) { // 轮询各个事件 SourceRecord record = event.value(); if (isHeartbeatEvent(record)) { // 如果时心跳包 // 只更新当前 offset 信息, 然后继续(不进行实际处理) synchronized (checkpointLock) { debeziumOffset.setSourcePartition(record.sourcePartition()); debeziumOffset.setSourceOffset(record.sourceOffset()); } continue; } deserialization.deserialize(record, debeziumCollector); // 反序列化这条消息 if (isInDbSnapshotPhase) { // 如果处于数据库快照期, 需要阻止 Flink 检查点(Checkpoint)生成 if (!lockHold) { MemoryUtils.UNSAFE.monitorEnter(checkpointLock); lockHold = true; ... } if (!isSnapshotRecord(record)) { // 如果已经不在数据库快照期了, 就释放锁, 允许 Flink 正常生成检查点(Checkpoint) MemoryUtils.UNSAFE.monitorExit(checkpointLock); isInDbSnapshotPhase = false; ... } } // 更新当前 offset 信息, 并向下游 Flink 算子发送数据 emitRecordsUnderCheckpointLock( debeziumCollector.records, record.sourcePartition(), record.sourceOffset());}
可以看到逻辑比较简单,只需要 checkpointLock 这个对象:只有持有这个对象的锁时,才允许 Flink 进行检查点的生成。
当作业处于数据库快照期(即作业刚启动时,需全量同步源数据库的一份完整快照,此时收到的数据类型是 Debezium 的 SnapshotRecord ),则不允许 Flink 进行 Checkpoint 即检查点的生成,以避免作业崩溃恢复后状态不一致;同样地,如果正在向下游算子发送数据并更新 offset 信息时,也不允许快照的进行。这些操作都是为了保证 Exacly-Once(精确一致)语义。
这里也解释了在作业刚启动时,如果数据库较大(同步时间较久),Flink 刚开始的 Checkpoint 永远失败(超时)的原因:只有当 Flink 完整同步了全量数据后,才可以进行增量数据的处理,以及 Checkpoint 的生成。
(2)flink-connector-mysql-cdc 模块
而对于 flink-connector-mysql-cdc 模块而言,它主要涉及到 MySQLTableSource 的声明和实现。
我们知道,Flink 是通过 Java 的 SPI(Service Provider Interface)机制动态加载 Connector 的,因此我们首先看这个模块的 src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory 文件,里面内容指向 com.alibaba.ververica.cdc.connectors.mysql.table.MySQLTableSourceFactory 。
打开这个工厂类,我们可以看到它定义了该 Connector 所需的参数,例如 MySQL 数据库的用户名、密码、表名等信息,并负责 MySQLTableSource 实例的具体创建,而 MySQLTableSource 类对这些参数做转换,最终会生成一个上文提到的 DebeziumSourceFunction 对象。
因此我们可以发现,这个模块作用是一个 MySQL 参数的封装和转换层,最终的逻辑实现仍然是由 flink-connector-debezium 完成的。
六、MySQL CDC 常见问题&优化
由于 Flink 的 CDC 功能还比较新(1.11 版本刚开始支持,1.12 版本逐步完善),因而在应用过程中,很可能会遇到有各种问题。鉴于大多数客户的数据源都是 MySQL,我们这里整理了客户常见的一些问题和优化方案,希望能够帮助到大家。
Debezium 报错:binlog probably contains events generated with statement or mixed based replication format
当前的 Binlog 格式被设置为了 STATEMENT 或者 MIXED , 这两种都不被 Debezium 支持。为了使用 Flink CDC 功能,需要把 MySQL 的 binlog-format 设置为 ROW :
SET GLOBAL binlog_format = 'ROW';SET GLOBAL binlog_row_image = 'FULL';
如果您使用的是腾讯云的 TencentDB for MySQL,请确认下面设置:

Debezium 报错:User does not have the 'LOCK TABLES' privilege required to obtain a consistent snapshot 或 Access denied; you need (at least one of) the SUPER, REPLICATION CLIENT privilege(s)
请对作业中指定的 MySQL 用户赋予如下权限: SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ,例如:
GRANT SELECT , RELOAD, SHOW DATABASES , REPLICATION SLAVE , REPLICATION CLIENT ON *.* TO '用户名' IDENTIFIED BY '密码' ;
FLUSH PRIVILEGES ;
如果您使用的数据库不允许或者不希望使用 RELOAD 进行全局锁,则还需要授予 LOCK TABLES 权限以令 Debezium 尝试进行表级锁。 注意,表级锁会导致更长的数据库锁定时间!
如果希望彻底跳过锁(对数据的一致性要求不高,但要求数据库不能被锁),则可以在 WITH 参数中设置 'debezium.snapshot.locking.mode' = 'none' 参数来跳过锁操作。但请注意,同步过程中千万不要随意变更库表的结构。
作业刚启动期间,Flink Checkpoint 一直失败/重启
前文讲过,Flink CDC Connector 在初始的全量快照同步阶段,会屏蔽掉快照的执行,因此如果 Flink Checkpoint 需要执行的话,就会因为一直无法获得 checkpointLock 对象的锁而超时。
可以设置 Flink 的 execution.checkpointing.tolerable-failed-checkpoint 参数以容忍更多的 Checkpoint 失败事件,同时可以调大 Checkpoint 周期,避免作业因 Checkpoint 失败而一直重启。
JDBC Sink 批量写入时,数据会缺失几条
如果发现数据库中的某些数据在 CDC 同步后有缺失,请确认是否仍在使用 Flink 旧版 1.10 的 Flink SQL WITH 语法(例如 WITH 参数中的 connector.type 是 旧语法 [ https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#jdbc-connector] , connector 是 新语法 [ https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/jdbc.html#how-to-create-a-jdbc-table] )。
旧版语法的 Connector 在 JDBC 批量写入 Upsert 数据(例如数据库的更新记录)时,并未考虑到 Upsert 与 Delete 消息之间的顺序关系,因此会出现错乱的问题,请尽快迁移到新版的 Flink SQL 语法。
异常数据造成作业持续重启
默认情况下,如果遇到异常的数据(例如消费的 Kafka topic 在无意间混入了其他数据),Flink 会立刻崩溃重启,然后从上个快照点(Checkpoint)重新消费。由于某条异常数据的存在,作业会永远因为异常而重启。可以在 WITH 参数中加入 'debezium-json.ignore-parse-errors' = 'true' 来应对这个问题。
上游 Debezium 崩溃导致写入重复数据,结果不准
Debezium 服务端发生异常并恢复后,由于可能没有及时记录崩溃前的现场,可能会退化为 At least once 模式,即同样的数据可能被发送多次,造成下游结果不准确。
为了应对这个问题,新版的 Flink 1.12 增加了一个 table.exec.source.cdc-events-duplicate 配置项(可以 flink-conf.yaml 文件来配置),建议将其设置为 true 以对这些重复数据进行去重。
但是需要注意,该选项需要数据源表 定义了主键 ,否则也无法进行去重操作。
七、未来展望
在 Flink 1.11 版本中,CDC 功能首次被集成到内核中。由于 Flink 1.11.0 版本有个 严重 Bug ( https://issues.apache.org/jira/browse/FLINK-18461 ) 造成 Upsert 数据无法写入下游,我们建议使用 1.11.1 及以上版本。
在 1.12 版本上,Flink 还在配置项中增加了前文提到的 table.exec.source.cdc-events-duplicate 等选项以更好地支持 CDC 去重;还支持 Avro 格式的 Debezium 数据流,而不仅仅限于 JSON 了。另外,这个版本增加了对 Maxwell ( https://maxwells-daemon.io/) 格式的 CDC 数据流支持,
为了更好地完善 CDC 功能模块,Flink 社区创建了 [FLINK-18822] 以追踪关于该模块的进展。可以从中看到,Flink 1.13 主要着力于支持更多的类型( FLINK-18758 [ https://issues.apache.org/jira/browse/FLINK-18758 ] ),以及允许从 Debezium Avro、Canal 等数据流中读取一些元数据信息等。
而在更远的规划中,Flink 还可能支持基于 CDC 的内存数据库缓存,这样我们可以在内存中动态地 JOIN 一个数据库的副本,而不必每次都查询源库,这将极大地提升作业的处理能力,并降低数据库的查询压力。
作者:kyledong
:腾讯云大数据
出处:https://mp.weixin.qq.com/s?__biz=MzUzNTc0NTcyMw==&mid=2247485657&idx=1&sn=f6fa360a7c35cb6f03e78222ca8691cb
- 1国行PS5京东双12活动开启:光驱版售价4199元!(ps5大概多少钱)
- 2苏州华东装饰城拆迁了吗,江苏省苏州市市场监管局2022年家用燃气灶具产品质量监督抽查情况公告(第39期)
- 399p2pzone,五一假期升级WiFi 关注哪些无线路由器?
- 4室内空气污染物有哪些?不只是甲醛和PM2.5!附送解决方法(室内空气污染物主要有哪些)
- 5放假通知来了!今早热搜刷屏,网友都在关注它……(五一放假几天2022年法定几天)
- 6色狼网论坛,天津网警成功断掉一大型色情网站
- 7新手爸妈必看,出生证明办理攻略来啦(亲子关系证明怎么开,其范本是怎样的?)
- 8公立医院内部审计存在问题及建议(关于公立医院改革试点的指导意见内部审计)
- 9软著代理费一般多少钱,版权中心改革政策已落地!计算机软件著作权申请成本翻几倍增加
发表观点(0条)