0
点赞
收藏
分享

微信扫一扫

【FLink源码分析】:Accumulator源码分析


文章目录

  • ​​Accumulator源码分析学习​​
  • ​​Accumulator源码结构​​
  • ​​Accumulator 结构​​
  • ​​成员方法​​
  • ​​SimpleAccumulator​​
  • ​​累加器使用案例​​
  • ​​code​​

Accumulator源码分析学习

Accumulator源码结构

Flink 的Accumulator是一个累加器,和spark的累加器是一个道理

Accumulator 结构

public interface Accumulator<V, R extends Serializable> extends Serializable, Cloneable {
}

成员方法

  • add
  • getLocalValue
  • resetLocal
  • merge
  • clone

【FLink源码分析】:Accumulator源码分析_Accumulator

SimpleAccumulator

源码中SimpleAccumulator接口继承了累加器,供上层的其他累加器实现这个接口,然后实现具体逻辑

@Public
public interface SimpleAccumulator<T extends Serializable> extends Accumulator<T,T> {}

  1. ​IntCounter,LongCounter,DoubleCounter​​:允许将 TaskManager 发送的 int,long,double 值汇总在一起
  2. ​LongMaximum,LongMinimum,IntMaximum,IntMinimum,DoubleMaximum,DoubleMinimum​​:累加器,用于确定不同类型的最大值和最小值

【FLink源码分析】:Accumulator源码分析_Accumulator_02

源码中可以看出有很多具体的不同功能的实现类都实现了这个接口

累加器使用案例

code

lines.flatMap(new RichFlatMapFunction<String, Tuple2<String, Integer>>() {

//创建一个累加器
private IntCounter linesNum = new IntCounter();

@Override
public void open(Configuration parameters) throws Exception {
//注册一个累加器
getRuntimeContext().addAccumulator("linesNum", linesNum);
}

@Override
public void flatMap(String line, Collector<Tuple2<String, Integer>> out) throws Exception {
String[] words = line.split("\\W+");
for (String word : words) {
out.collect(new Tuple2<>(word, 1));
}

// 处理每一行数据后 linesNum 递增
linesNum.add(1);
}
})
.groupBy(0)
.sum(1)
.print();

//获取累加器结果
int linesNum = env.getLastJobExecutionResult().getAccumulatorResult("linesNum");
System.out.println(linesNum);


举报

相关推荐

0 条评论