文章目录
一、概述
- DataSet API:对静态数据进行批处理作业,将静态数据抽象成分布式的数据集,用户可以方便的使用Flink提供的各种操作符对分布式数据集进行处理,支持Java,Scala和python;
- DataStream API:对数据流进行流处理作业,将流式的数据抽象成分布式的数据流,用户可以方面的对分布式数据流进行各种操作,支持Java,scala和python;
- Table API:对结构化数据进行查询操作,将结构化数据抽象成关系表,并通过SQL的DSL对关系表进行各种查询操作,支持Java和Scala;
- SQL:SQL查询是使用TableEnvironment的sqlquery()方法执行的,该方法以SQL的形式返回SQL查询的结果。Table可以在后续的SQL和Table API查询中使用,可以转换诶DataSet和DataStream,也可以写入TableSink。SQL和Table API可以无缝的整合,进行整体优化并转换为单个程序。要访问SQL中查询的表,必须在TableEnvironment中注册他,可以从TableSource,Table,DataStream和DataSet注册表,用户也可以在TableEnvironment中注册外部目录以制定数据源的位置。Blink开源后,将使Flink SQL更加完善稳定。
- StateFul Stream Processing:最低级抽象只提供有状态流,通过Process Function嵌入到DataStream API中,它允许用户自由处理来自一个或者多个流的时间,并使用一致的容错状态,此外用户可以注册event time和processing time回调,允许程序实现复杂的计算。
二、Flink工作原理
Flink的基础架构图:
-
JobClient:负责接收程序,解析和优化程序的执行计划,然后提交执行计划到JobManager。这里执行的程序优化是将相邻的Operator融合,形成Operator Chain,Operator的融合可以减少task的数量,提高TaskManager的资源利用率。为了了解Flink的解析过程,需要简单介绍一下Flink的Operator,在Flink主要有三类Operator:
- Source Operator :顾名思义这类操作一般是数据来源操作,比如文件、socket、kafka等,一般存在于程序的最开始
- Transformation Operator: 这类操作主要负责数据转换,map,flatMap,reduce等算子都属于Transformation Operator,
- Sink Operator:意思是下沉操作,这类操作一般是数据落地,数据存储的过程,放在Job最后,比如数据落地到Hdfs、Mysql、Kafka等等。
-
JobManagers:负责申请资源,协调以及控制整个job的执行过程,具体包括,调度任务、处理checkpoint、容错等等。
-
TaskManager:TaskManager运行在不同节点上的JVM进程,负责接收并执行JobManager发送的task,并且与JobManager通信,反馈任务状态信息,如果说JobManager是master的话,那么TaskManager就是worker用于执行任务。每个TaskManager像是一个容器,包含一个或者多个Slot。
-
Slot:Slot是TaskManager资源粒度的划分,每个Slot都有自己独立的内存。所有Slot平均分配TaskManager的内存,值得注意的是,Slot仅划分内存,不涉及CPU的划分,即CPU是共享使用。每个Slot可以运行多个task。Slot的个数就代表了一个程序的最高并行度。
-
Task:Task是在operators的subtask进行链化之后形成的,具体Flink job中有多少task和operator的并行度和链化的策略有关。
-
SubTask:因为Flink是分布式部署的,程序中的每个算子,在实际执行中被分隔为一个或者多个subtask,运算符子任务(subtask)的数量是该特定运算符的并行度。数据流在算子之间流动,就对应到SubTask之间的数据传输。Flink允许同一个job中来自不同task的subtask可以共享同一个slot。每个slot可以执行一个并行的pipeline。可以将pipeline看作是多个subtask的组成的。
三、Flink核心概念
1)Time(时间语义)
-
事件时间:是事件真实发生的时间。
-
达到时间:是系统接收到事件的时间,即服务端接收到事件的时间。
-
处理时间:是系统开始处理到达事件的时间。
Flink 中只需要通过 env 环境变量即可设置Time:
//创建环境上下文
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 设置在当前程序中使用 ProcessingTime
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
2)Window(窗口)
-
Time Window:基于时间的,分为Tumbling Window(无数据重叠)和Sliding Window(有数据重叠) 。
-
Count Window:基于数量的,分为Tumbling Window(无数据重叠)和Sliding Window(有数据重叠)。
-
Session Window:基于会话的,一个session window关闭通常是由于一段时间没有收到元素。
-
Global Window:全局窗口。
3) Trigger
1、自定义触发器
- onElement(): 每个被添加到窗口中的元素都会被调用
- onEventTime():当事件时间定时器触发时会被调用,比如watermark到达
- onProcessingTime():当处理时间定时器触发时会被调用,比如时间周期触发
- onMerge():当两个窗口合并时两个窗口的触发器状态将会被调动并合并
- clear():执行需要清除相关窗口的事件
以上方法会返回决定如何触发执行的 TriggerResult:
- CONTINUE:什么都不做
- FIRE:触发计算
- PURGE:清除窗口中的数据
- FIRE_AND_PURGE:触发计算后清除窗口中的数据
2、预定义触发器
4)State
- Managed State 是由flink runtime管理来管理的,自动存储、自动恢复,在内存管理上有优化机制。且Managed State 支持常见的多种数据结构,如value、list、map等,在大多数业务场景中都有适用之处。总体来说是对开发人员来说是比较友好的,因此 Managed State 是 Flink 中最常用的状态。Managed State 又分为 Keyed State 和 Operator State 两种。
- Raw State 由用户自己管理,需要序列化,只能使用字节数组的数据结构。Raw State 的使用和维度都比 Managed State 要复杂,建议在自定义的Operator场景中酌情使用。
5)状态存储
1、MemoryStateBackend
-
构造函数:MemoryStateBackend(int maxStateSize, boolean asyncSnapshot)
-
存储方式:State存储于各个 TaskManager内存中,Checkpoint存储于 JobManager内存
-
容量限制:单个State最大5M、maxStateSize<=akka.framesize(10M)、总大小不超过JobManager内存
-
使用场景:无状态或者JobManager挂掉不影响的测试环境等,不建议在生产环境使用
2、FsStateBackend
-
构造函数:FsStateBackend(URI checkpointUri, boolean asyncSnapshot)
-
存储方式:State存储于 TaskManager内存,Checkpoint存储于 外部文件系统(本次磁盘 or HDFS)
-
容量限制:State总量不超过TaskManager内存、Checkpoint总大小不超过外部存储空间
-
使用场景:常规使用状态的作业,分钟级的窗口聚合等,可在生产环境使用
3、RocksDBStateBackend
-
构造函数:RocksDBStateBackend(URI checkpointUri, boolean enableincrementCheckpoint)
-
存储方式:State存储于 TaskManager上的kv数据库(内存+磁盘),Checkpoint存储于 外部文件系统(本次磁盘 or HDFS)
-
容量限制:State总量不超过TaskManager内存+磁盘、单key最大2g、Checkpoint总大小不超过外部存储空间
-
使用场景:超大状态的作业,天级的窗口聚合等,对读写性能要求不高的场景,可在生产环境使用
根据业务场景需要用户选择最合适的 StateBackend ,代码中只需在相应的 env 环境中设置即可:
// flink 上下文环境变量
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 设置状态后端为 FsStateBackend,数据存储到 hdfs /tmp/flink/checkpoint/test 中
env.setStateBackend(new FsStateBackend("hdfs://ns1/tmp/flink/checkpoint/test", false))
6)Checkpoint
用户只需在相应的 env 环境中设置即可:
// 1000毫秒进行一次 Checkpoint 操作
env.enableCheckpointing(1000)
// 模式为准确一次
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
// 两次 Checkpoint 之间最少间隔 500毫秒
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
// Checkpoint 过程超时时间为 60000毫秒,即1分钟视为超时失败
env.getCheckpointConfig.setCheckpointTimeout(60000)
// 同一时间只允许1个Checkpoint的操作在执行
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
1、Asynchronous Barrier Snapshots(ABS)
异步屏障快照算法,这个算法基本上是Chandy-Lamport算法的变体,针对DAG(有向无环图)的ABS算法执行流程如下所示:
- Barrier周期性的被注入到所有的Source中,Source节点看到Barrier后,会立即记录自己的状态,然后将Barrier发送到Transformation Operator。
- 当Transformation Operator从某个input channel收到Barrier后,它会立刻Block住这条通道,直到所有的input channel都收到Barrier,这个等待的过程就叫做屏障对齐(barrier alignment),此时该Operator就会记录自身状态,并向自己的所有output channel广播Barrier。
- Sink接受Barrier的操作流程与Transformation Oper一样。当所有的Barrier都到达Sink之后,并且所有的Sink也完成了Checkpoint,这一轮Snapshot就完成了。
下面这个图展示了一个ABS算法的执行过程:
2、Exactly-Once vs At-Least-Once
- 上面讲到的屏障对齐过程是Flink exactly-once语义的基础,因为屏障对齐能够保证多输入流的算子正常处理不同checkpoint区间的数据,避免它们发生交叉,即不会有数据被处理两次。
- 但是对齐过程需要时间,有一些对延迟特别敏感的应用可能对准确性的要求没有那么高。所以Flink也允许在StreamExecutionEnvironment.enableCheckpointing()方法里指定At-Least-Once语义,会取消屏障对齐,即算子收到第一个输入的屏障之后不会阻塞,而是触发快照。这样一来,部分属于检查点n + 1的数据也会包括进检查点n的数据里, 当恢复时,这部分交叉的数据就会被重复处理。
7)Watermark
1、自定义数据源设置 Timestamp/Watermark
自定义的数据源类需要继承并实现 SourceFunction[T] 接口,其中 run 方法是定义数据生产的地方:
//自定义的数据源为自定义类型MyType
class MySource extends SourceFunction[MyType]{
//重写run方法,定义数据生产的逻辑
override def run(ctx: SourceContext[MyType]): Unit = {
while (/* condition */) {
val next: MyType = getNext()
//设置timestamp从MyType的哪个字段获取(eventTimestamp)
ctx.collectWithTimestamp(next, next.eventTimestamp)
if (next.hasWatermarkTime) {
//设置watermark从MyType的那个方法获取(getWatermarkTime)
ctx.emitWatermark(new Watermark(next.getWatermarkTime))
}
}
}
}
2、在数据流中设置 Timestamp/Watermark
val withTimestampsAndWatermarks: DataStream[MyEvent] = stream.assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks())
8)广播状态(Broadcast State)
-
和 Spark 中的广播变量一样,Flink 也支持在各个节点中各存一份小数据集,所在的计算节点实例可在本地内存中直接读取被广播的数据,可以避免Shuffle提高并行效率。
-
广播状态(Broadcast State)的引入是为了支持一些来自一个流的数据需要广播到所有下游任务的情况,它存储在本地,用于处理其他流上的所有传入元素。
// key the shapes by color
KeyedStream<Item, Color> colorPartitionedStream = shapeStream.keyBy(new KeySelector<Shape, Color>(){...});
// a map descriptor to store the name of the rule (string) and the rule itself.
MapStateDescriptor<String, Rule> ruleStateDescriptor = new MapStateDescriptor<>("RulesBroadcastState",BasicTypeInfo.STRING_TYPE_INFO, TypeInformation.of(new TypeHint<Rule>() {}));
// broadcast the rules and create the broadcast state
BroadcastStream<Rule> ruleBroadcastStream = ruleStream.broadcast(ruleStateDescriptor);
DataStream<Match> output = colorPartitionedStream.connect(ruleBroadcastStream).process(new KeyedBroadcastProcessFunction<Color, Item, Rule, String>(){...});
9)Operator Chain
-
Flink作业中,可以指定相关的chain将相关性非常强的转换操作(operator)绑定在一起,使得上下游的Task在同一个Pipeline中执行,避免因为数据在网络或者线程之间传输导致的开销。
-
一般情况下Flink在Map类型的操作中默认开启 Operator Chain 以提高整体性能,开发人员也可以根据需要创建或者禁止 Operator Chain 对任务进行细粒度的链条控制。
//创建 chain
dataStream.filter(...).map(...).startNewChain().map(...)
//禁止 chain
dataStream.map(...).disableChaining()
10)Side Output
DataStream<Integer> input = ...;
final OutputTag<String> outputTag = new OutputTag<String>("side-output"){};
SingleOutputStreamOperator<Integer> mainDataStream = input
.process(new ProcessFunction<Integer, Integer>() {
@Override
public void processElement(
Integer value,
Context ctx,
Collector<Integer> out) throws Exception {
// 将数据发送到常规输出中
out.collect(value);
// 将数据发送到侧输出中
ctx.output(outputTag, "sideout-" + String.valueOf(value));
}
});
DataStream<String> sideOutputStream = mainDataStream.getSideOutput(outputTag);
四、对比常用的实时计算框架
- Flink 是有状态的和容错的,可以在维护一次应用程序状态的同时无缝地从故障中恢复。
- 它支持大规模计算能力,能够在数千个节点上并发运行。
- 它具有很好的吞吐量和延迟特性。
- 同时,Flink 提供了多种灵活的窗口函数。
- Flink 在流式计算里属于真正意义上的单条处理,每一条数据都触发计算,而不是像 Spark 一样的 Mini Batch 作为流式处理的妥协。
- Flink的容错机制较为轻量,对吞吐量影响较小,而且拥有图和调度上的一些优化,使得 Flink 可以达到很高的吞吐量。
- 而 Strom 的容错机制需要对每条数据进行ack,因此其吞吐量瓶颈也是备受诟病。
五、Flink环境部署(Flink On Yarn)
下载地址:http://flink.apache.org/downloads.html
1)Local模式
Local Cluster模式是开箱即用的,直接解压安装包,然后启动即可。
$ cd /opt/bigdata/hadoop/software
$ wget https://dlcdn.apache.org/flink/flink-1.14.2/flink-1.14.2-bin-scala_2.12.tgz
# 解压
$ tar -zxvf flink-1.14.2-bin-scala_2.12.tgz -C /opt/bigdata/hadoop/server/
# 进入bin目录运行启动脚本
$ cd /opt/bigdata/hadoop/server/flink-1.14.2
$ ./bin/start-cluster.sh
打开浏览器输入http://IP:8081,查看WEBUI监控界面
我这里地址:http://hadoop-node1:8081
2)Standalone模式
1、机器及角色划分
机器IP | 机器名 | 节点类型 |
---|---|---|
192.168.0.113 | hadoop-node1 | Master |
192.168.0.114 | hadoop-node2 | Worker |
192.168.0.115 | hadoop-node3 | Worker |
1、下载
$ cd /opt/bigdata/hadoop/software
$ wget https://dlcdn.apache.org/flink/flink-1.14.2/flink-1.14.2-bin-scala_2.12.tgz
# 解压
$ tar -zxvf flink-1.14.2-bin-scala_2.12.tgz -C /opt/bigdata/hadoop/server/
$ cd /opt/bigdata/hadoop/server/flink-1.14.2
2、修改配置文件
- 修改flink-conf.yaml文件
$ cd /opt/bigdata/hadoop/server/flink-1.14.2/conf
$ vi flink-conf.yaml
## jobmanager节点地址,也是master节点地址
jobmanager.rpc.address: hadoop-node1
- 修改masters文件
把默认的localhost:8081删掉,添加如下内容:
hadoop-node1:8081
- 修改workers文件,内容如下:
把默认的localhost删掉,添加如下内容:
hadoop-node2
hadoop-node3
3、将安装目录复制到另外两台节点
$ cd /opt/bigdata/hadoop/server
$ scp -r flink-1.14.2 hadoop-node2:/opt/bigdata/hadoop/server/
$ scp -r flink-1.14.2 hadoop-node3:/opt/bigdata/hadoop/server/
4、配置环境变量,修改/etc/profile
在/etc/profile文件中添加如下内容(所有节点):
export FLINK_HOME=/opt/bigdata/hadoop/server/flink-1.14.2
export PATH=$PATH:$FLINK_HOME/bin
使配置文件生效
$ source /etc/profile
5、启动集群
$ start-cluster.sh
3)On Yarn模式(推荐)
On Yarn官方文档:https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/resource-providers/yarn/
FLink on yarn 有两种运行模式:
- 创建一个yarn-session,后续提交应用至该session
- 直接 yarn cluster模式提交应用至yarn服务
下载
$ cd /opt/bigdata/hadoop/software
$ wget https://dlcdn.apache.org/flink/flink-1.14.2/flink-1.14.2-bin-scala_2.12.tgz
# 解压
$ tar -zxvf flink-1.14.2-bin-scala_2.12.tgz -C /opt/bigdata/hadoop/server/
$ cd /opt/bigdata/hadoop/server/flink-1.14.2
配置
在/etc/profile文件中追加如下内容:
export FLINK_HOME=/opt/bigdata/hadoop/server/flink-1.14.2
export PATH=$PATH:$FLINK_HOME/bin
# 上面两句如果加过,可以忽略
export HADOOP_CLASSPATH=`hadoop classpath`
加载配置
$ source /etc/profile
1、yarn-session模式
实验
- 【第一步】开源资源,使用如下命令:
$ yarn-session.sh -n 2 -jm 1024 -tm 1024 -d
### 参数解释:
#-n 2 表示指定两个容器
# -jm 1024 表示jobmanager 1024M内存
# -tm 1024表示taskmanager 1024M内存
#-d 任务后台运行
### 如果你不希望flink yarn client一直运行,也可以启动一个后台运行的yarn session。使用这个参数:-d 或者 --detached。在这种情况下,flink yarn client将会只提交任务到集群然后关闭自己。注意:在这种情况下,无法使用flink停止yarn session,必须使用yarn工具来停止yarn session。
# yarn application -kill $applicationId
#-nm,--name YARN上为一个自定义的应用设置一个名字
#-q,--query 显示yarn中可用的资源 (内存, cpu核数)
#-z,--zookeeperNamespace <arg> 针对HA模式在zookeeper上创建NameSpace
#-id,--applicationId <yarnAppId> YARN集群上的任务id,附着到一个后台运行的yarn session中
JobManager Web Interface: http://hadoop-node2:41787,端口是随机的。
通过yarn入口访问flink
- 【第二步】提交任务
为了进行测试,我们对Flink目录下的LICENSE文件进行词频统计
$ cd $FLINK_HOME
$ hadoop fs -put LICENSE /
$ hadoop fs -ls /LICENSE
# 提交任务
$ flink run ./examples/batch/WordCount.jar -input hdfs://hadoop-node1:8082/LICENSE -output hdfs://hadoop-node1:8082/wordcount-result.txt
再提交一次任务
$ flink run ./examples/batch/WordCount.jar -input hdfs://hadoop-node1:8082/LICENSE -output hdfs://hadoop-node1:8082/wordcount-result2.txt
发现现在已经有两个跑完的任务了,但是只有一个flink集群,从而验证了yarn-session模式
2、yarn-cluster模式(推荐)
实验
$ cd $FLINK_HOME
$ flink run -m yarn-cluster -yjm 1024 -ytm 1024 ./examples/batch/WordCount.jar
# 查看帮助
$ flink --help
### 参数详解,这里只列出了部分参数
Options for yarn-cluster mode:
-d,--detached If present, runs the job in detached
mode
-m,--jobmanager <arg> Set to yarn-cluster to use YARN
execution mode.
-yat,--yarnapplicationType <arg> Set a custom application type for the
application on YARN
-yD <property=value> use value for given property
-yd,--yarndetached If present, runs the job in detached
mode (deprecated; use non-YARN
specific option instead)
-yh,--yarnhelp Help for the Yarn session CLI.
-yid,--yarnapplicationId <arg> Attach to running YARN session
-yj,--yarnjar <arg> Path to Flink jar file
-yjm,--yarnjobManagerMemory <arg> Memory for JobManager Container with
optional unit (default: MB)
-ynl,--yarnnodeLabel <arg> Specify YARN node label for the YARN
application
-ynm,--yarnname <arg> Set a custom name for the application
on YARN
-yq,--yarnquery Display available YARN resources
(memory, cores)
-yqu,--yarnqueue <arg> Specify YARN queue.
-ys,--yarnslots <arg> Number of slots per TaskManager
-yt,--yarnship <arg> Ship files in the specified directory
(t for transfer)
-ytm,--yarntaskManagerMemory <arg> Memory per TaskManager Container with
optional unit (default: MB)
-yz,--yarnzookeeperNamespace <arg> Namespace to create the Zookeeper
sub-paths for high availability mode
-z,--zookeeperNamespace <arg> Namespace to create the Zookeeper
sub-paths for high availability mode
发现查看不了History,是因为没起History服务,下面启动这个服务
historyserver简介与配置
配置historyserver
$ cd $FLINK_HOME/bin
# 选创建目录
$ hdfs://hadoop-node1:8082/flink/completed-jobs/
# conf/flink-conf.yaml
# 指定由JobManager归档的作业信息所存放的目录,这里使用的是HDFS
jobmanager.archive.fs.dir: hdfs://hadoop-node1:8082/flink/completed-jobs/
# 指定History Server扫描哪些归档目录,多个目录使用逗号分隔
historyserver.archive.fs.dir: hdfs://hadoop-node1:8082/flink/completed-jobs/
# 指定History Server间隔多少毫秒扫描一次归档目录
historyserver.archive.fs.refresh-interval: 10000
# History Server所绑定的ip,hadoop-node1代表允许所有ip访问
historyserver.web.address: hadoop-node1
# 指定History Server所监听的端口号,默认端口是8082
historyserver.web.port: 9082
启动historyserver
$ ./historyserver.sh start
$ jps
web:http://hadoop-node1:9082
重新跑一次任务
$ flink run -m yarn-cluster -yjm 1024 -ytm 1024 ./examples/batch/WordCount.jar
查看正在运行任务的日志,yarn容器退出之后,就下面入口就访问不了了。
六、Spark与Flink对比
Flink执行流程图如下:
对比维度 | Spark | Flink |
---|---|---|
设计理念 | Spark的技术理念是使用微批来模拟流的计算,基于Micro-batch,数据流以时间为单位被切分为一个个批次,通过分布式数据集RDD进行批量处理,是一种伪实时。 | Flink是基于事件驱动的,是面向流的处理框架, Flink基于每个事件一行一行地流式处理,是真正的实时流式计算, 另外他也可以基于流来模拟批进行计算实现批处理。 |
架构方面 | Spark在运行时的主要角色包括:Master、Worker、Driver、Executor。 | Flink 在运行时主要包含:Jobmanager、Taskmanager和Slot。 |
任务调度 | Spark Streaming 连续不断的生成微小的数据批次,构建有向无环图DAG,根据DAG中的action操作形成job,每个job有根据窄宽依赖生成多个stage。 | 使用DataStream API开发的应用程序,首先被转换为Transformation,再被映射为StreamGraph,在客户端进行StreamGraph、JobGraph的转换,提交JobGraph到Flink集群后,Flink集群负责将JobGraph转换为ExecutionGraph,之后进入调度执行阶段。 |
时间机制 | Spark Streaming 支持的时间机制有限,只支持处理时间。使用processing time模拟event time必然会有误差, 如果产生数据堆积的话,误差则更明显。 | flink支持三种时间机制:事件时间,注入时间,处理时间,同时支持 watermark 机制处理迟到的数据,说明Flink在处理乱序大实时数据的时候,更有优势。 |
容错机制 | SparkStreaming的容错机制是基于RDD的容错机制,会将经常用的RDD或者对宽依赖加Checkpoint。利用SparkStreaming的direct方式与Kafka可以保证数据输入源的,处理过程,输出过程符合exactly once。 | Flink 则使用两阶段提交协议来保证exactly once。 |
吞吐量与延迟 | spark是基于微批的,而且流水线优化做的很好,所以说他的吞入量是最大的,但是付出了延迟的代价,它的延迟是秒级。 | 而Flink是基于事件的,消息逐条处理,而且他的容错机制很轻量级,所以他能在兼顾高吞吐量的同时又有很低的延迟,它的延迟能够达到毫秒级。 |
Flink原理介绍先写到这里了,更多关于Flink的知识点,请您耐心等待,当然也可以先自行去看官方文档:https://nightlies.apache.org/flink/。