文章目录
- Accumulator源码分析学习
- Accumulator源码结构
- Accumulator 结构
- 成员方法
- SimpleAccumulator
- 累加器使用案例
- code
Accumulator源码分析学习
Accumulator源码结构
Flink 的Accumulator是一个累加器,和spark的累加器是一个道理
Accumulator 结构
public interface Accumulator<V, R extends Serializable> extends Serializable, Cloneable {
}
成员方法
- add
- getLocalValue
- resetLocal
- merge
- clone
SimpleAccumulator
源码中SimpleAccumulator接口继承了累加器,供上层的其他累加器实现这个接口,然后实现具体逻辑
@Public
public interface SimpleAccumulator<T extends Serializable> extends Accumulator<T,T> {}
-
IntCounter,LongCounter,DoubleCounter
:允许将 TaskManager 发送的 int,long,double 值汇总在一起 -
LongMaximum,LongMinimum,IntMaximum,IntMinimum,DoubleMaximum,DoubleMinimum
:累加器,用于确定不同类型的最大值和最小值
源码中可以看出有很多具体的不同功能的实现类都实现了这个接口
累加器使用案例
code
lines.flatMap(new RichFlatMapFunction<String, Tuple2<String, Integer>>() {
//创建一个累加器
private IntCounter linesNum = new IntCounter();
@Override
public void open(Configuration parameters) throws Exception {
//注册一个累加器
getRuntimeContext().addAccumulator("linesNum", linesNum);
}
@Override
public void flatMap(String line, Collector<Tuple2<String, Integer>> out) throws Exception {
String[] words = line.split("\\W+");
for (String word : words) {
out.collect(new Tuple2<>(word, 1));
}
// 处理每一行数据后 linesNum 递增
linesNum.add(1);
}
})
.groupBy(0)
.sum(1)
.print();
//获取累加器结果
int linesNum = env.getLastJobExecutionResult().getAccumulatorResult("linesNum");
System.out.println(linesNum);