StreamExecutionEnvironment
ExecutionConfig
并行度 Parallelism
- 有默认值,等于cpu 数量
 - getter and setter方法
 - 最大并行度: 0 < maxParallelism <= 2^15 - 1
 
重启策略 RestartStrategies
- setRestartStrategy()
 - getRestartStrategy()
 
链化优化 isChainingEnabled
- isChainingEnabled
 - disableOperatorChaining
 
重试次数 numberOfExecutionRetries
- setNumberOfExecutionRetries
 - getNumberOfExecutionRetries
 
TimeCharacteristic
默认为处理时间
-  
setStreamTimeCharacteristic(时间类型)
- 处理时间时AutoWatermarkInterval水印间隔为0ms
 - 事件时间时AutoWatermarkInterval水印间隔为200ms
 
 
CheckpointConfig
-  
checkpointingMode :EXACTLY_ONCE、AT_LEAST_ONCE
- getCheckpointingMode()
 
 -  
checkpointTimeout
 -  
CheckpointInterval
- enableCheckpointing
 
 -  
forceCheckpointing
- isForceCheckpointing
 - setForceCheckpointing
 
 -  
PauseBetweenCheckpoints
- minPauseBetweenCheckpoints
 - maxPauseBetweenCheckpoints
 
 -  
maxConcurrentCheckpoints
 -  
failOnCheckpointingErrors
- isFailOnCheckpointingErrors
 
 -  
ExternalizedCheckpointCleanup
允许外部持久化检查点
枚举值有:
- DELETE_ON_CANCELLATION:job取消时删除持久化的检查点和状态。取消作业后,无法从外部的检查点恢复。
 - RETAIN_ON_CANCELLATION:job取消时保留持久化的检查点和状态,取消作业后,必须手动删除删除检查点元数据和状态
 
方法:
enableExternalizedCheckpoints(cleanupMode) 设置检查点持久化存储的模式
isExternalizedCheckpointsEnabled() 是否允许持久化到外部存储。
getExternalizedCheckpointCleanup() 返回持久化外部存储的模式
 
StateBackend
-  
描述如何存储和设置算子状态和检查点
 -  
定义了在执行期间 检查点数据和状态将会被持久化的数据结构(可以是hashtable、RockDB、其他数据存储)
 
主要方法
创建environment
- StreamExecutionEnvironment.getExecutionEnvironment
 - StreamExecutionEnvironment.createLocalEnvironment
 - StreamExecutionEnvironment.createRemoteEnvironment
 
数据源
基于集合的数据源
generateSequence
env.generateSequence(long from,long to) 生成从from到to的数字序列,并将序列数据作为输入数据源。如果并行度设置为1,则生成的元素序列是有序的。
fromElements
从一个给定的对象序列中创建一个数据流,所有的对象必须是相同类型的。并行度为1。
对于不确定的泛型类,可能需要显示指定 TypeInformation。
fromCollection
从一个给定的非空集合或者迭代器中创建数据流,并行度为1。
fromParallelCollection
创建一个包含了可拆分的迭代器中元素的数据流,允许框架创建一个并行数据流来返回迭代器中的元素。
迭代器在实际执行之前不会被修改,因此迭代器返回的数据类型必须以Type类的形式显示给出(因为JAVA编译器会删除泛型信息)
基于文件的数据源
readTextFile
- 一行一行的读取给定的文件,并创建一个数据流,包含一个包含每行内容的字符串。将使用系统的默认字符集或者指定字符集读取文件。
 - readTextFile调用了
readFile(TextInputFormat(路径, 字符集, 分隔符), 文件处理模式.处理一次, 时间间隔, 以文本类型读取文件)方法 - readFile()方法调用了
createFileInput(TextInputFormat(路径, 字符集, 分隔符),typeInformation, 文件处理模式, 时间间隔)方法。 - createFileInput调用addSource()方法。
 
readFile
按照指定的文件格式TextInputFormat读取文件。
createFileInput
createFileInput(TextInputFormat(路径, 字符集, 分隔符),typeInformation, 文件处理模式, 时间间隔)
// 创建一个以传入时间为间隔的定期文件扫描监控器
ContinuousFileMonitoringFunction<OUT> monitoringFunction = new ContinuousFileMonitoringFunction<>(inputFormat, monitoringMode, getParallelism(), interval);
// 创建一个ready算子
ContinuousFileReaderOperator<OUT> reader = new ContinuousFileReaderOperator<>(inputFormat);
// 创建输入源,并发度读取environment中的并发配置。
SingleOutputStreamOperator<OUT> source = addSource(monitoringFunction, sourceName).transform("Split Reader: " + sourceName, typeInfo, reader);
 
FileProcessingMode有两种,
- PROCESS_ONCE 处理路径上的当前内容,然后退出。
 - PROCESS_CONTINUOUSLY 定期扫描路径以获取新内容。
 
createInput
根据TextInputFormat创建一个输入流的通用方法
基于Socket的数据源
socketTextStream
从一个socket中创建一个包含了接收到的无线的字符串的数据流,接收到的字符串是被系统默认decode的。socket本身不会报告终止,结果就是只有socket 优雅终止时会启动重试。
socketTextStream(hostname, port, String.valueOf(delimiter), maxRetry);
 
addSource 添加数据源
添加一个数据源到数据流的拓扑图中。
默认并发为1,如果想要并发执行,可以实现ParallelSourceFunction 或者继承RichParallelSourceFunction
 function – the user defined function
 sourceName – Name of the data source
 typeInfo – the user defined type information for the stream
@SuppressWarnings("unchecked")
public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function, String sourceName, TypeInformation<OUT> typeInfo) {
	if (typeInfo == null) {
		if (function instanceof ResultTypeQueryable) {
			typeInfo = ((ResultTypeQueryable<OUT>) function).getProducedType();
		} else {
			try {
				typeInfo = TypeExtractor.createTypeInfo(
						SourceFunction.class,
						function.getClass(), 0, null, null);
			} catch (final InvalidTypesException e) {
				typeInfo = (TypeInformation<OUT>) new MissingTypeInfo(sourceName, e);
			}
		}
	}
	boolean isParallel = function instanceof ParallelSourceFunction;
	clean(function);
	StreamSource<OUT, ?> sourceOperator;
	if (function instanceof StoppableFunction) {
		sourceOperator = new StoppableStreamSource<>(cast2StoppableSourceFunction(function));
	} else {
		sourceOperator = new StreamSource<>(function);
	}
	return new DataStreamSource<>(this, typeInfo, sourceOperator, isParallel, sourceName);
}
 
任务运行
execute
触发程序执行。环境将会执行程序的sink算子的所有上下游。sink算子有两种,打印结果集或者发送结果集到消息队列中。
可以显示指定jobName,默认为“Flink Streaming Job”.
getStreamGraph
获取流式任务的streamGraph。
- 取environment中的 状态后端,是否链式优化,环境上下文 作为参数构造一个流程图
 - 将sinks算子之前的transformList进行迭代,在迭代中设置streamGraph的bufferTimeout、transofrmationUID、transformationUserHash、source
 
getExecutionPlan
-  
运行程序是创建一个计划,返回执行的data flow graph。
 -  
这个方法在计划执行之前,需要被调用。
 -  
返回JSON字符串。
 
cast2StoppableSourceFunction
clean
返回给定函数的“closure cleaned”版本。仅当ExecutionConfig中未禁用闭包清理时才进行清理
addOperator
添加一个算子到转换list中。List<StreamTransformation<?>> transformations
并不一定是用户调用,创建算子的API方法必须调用这个方法。
registerCacheFile
在分布式缓存中注册一个文件,这个文件在运行的时候将会被所有的用户自定义的function 作为一个本地localPath访问。
文件可以是本地文件,也可以是分布式文件。如果有必要,运行时将会临时拷贝文件到本地缓存中。










