实现UDF函数——更细粒度的控制流
函数类(Function Classes)
我们可以先分清一下概念:
算子:
将flink中的addSource、Transform算子、addSink都为算子
函数:
- 算子中的参数;
- 也就是各种各样的Function就是函数,这些函数基本上都是接口,也有抽象类;
- 函数类的调用是针对
DataStream
中的每一个数据的,也就是每个元素都要调用一下。 - Flink暴露了所有udf函数的接口(实现方式为接口或者抽象类)。例如MapFunction, FilterFunction, ProcessFunction等等,他们都是Function接口的子接口
- Function接口实现了序列化。
匿名函数(Lambda Functions)
就是Java8 Lambda表达式写法
DataStream<String> tweets = env.readTextFile("INPUT_FILE");
DataStream<String> flinkTweets = tweets.filter( tweet -> tweet.contains("flink") );
富函数(Rich Functions)☆
富函数介绍
富函数是抽象类,继承了AbstractRichFunction,实现了常规函数接口
public abstract class RichFlatMapFunction<IN, OUT>
extends AbstractRichFunction implements FlatMapFunction<IN, OUT>
- 富函数是DataStream API提供的一个函数类的接口;
ProcessFunction API
是Flink中最底层的API,功能最强大,因为ProcessFunction
都是RichFunction
的实现类,所以RichFunction
能干的事情,ProcessFunction
都能做。
RichFuntion接口是Function接口的子接口, Flink的所有函数都有”富函数”版本,比如:
- MapFunction =>RichMapFunction
- RichFlatMapFunction
- RichFilterFunction
- …
这些富函数一方面继承了AbstractRichFunction
,一方面实现了普通的Function接口
:
富函数的功能
富函数比常规函数多了两个功能
- 获取运行环境的上下文
- 生命周期方法
注意: 在用普通Function的时候,都是实现接口,但是在编写富函数的时候,用的都是继承,RichFunction接口下面都是抽象类
Rich Function抽象类所具有的典型的生命周期方法
- open()方法:rich function的初始化方法,当一个算子例如map或者filter被调用之前open()会被调用。
- close()方法:是生命周期中的最后一个调用的方法,做一些清理工作。
- getRuntimeContext()方法:获取函数的运行时上下文RuntimeContext,从中可以获取函数的信息:例如函数执行的并行度,任务的名字,以及状态编程(算子的状态)
富函数的生命周期方法提供了一种全局的视角,而非局限于单个元素,生命周期方法是对整个DataStream调用一次 [更正:生命周期方法是基于并行度的,并非整个dataStream只调用一次,可以参考 [第9章9.1.3例子中的报错],而富函数中的执行函数是对每一个元素调用一次的,因此可以提供一些建立在DataStream上的功能,比如获取第三方框架的连接
public static class MyMapFunction extends RichMapFunction<SensorReading, Tuple2<Integer, String>> {
@Override
public Tuple2<Integer, String> map(SensorReading value) throws Exception {
return new Tuple2<>(getRuntimeContext().getIndexOfThisSubtask(), value.getId());
}
@Override
public void open(Configuration parameters) throws Exception {
System.out.println("my map open");
// 以下可以做一些初始化工作,例如建立一个和HDFS的连接
}
@Override
public void close() throws Exception {
System.out.println("my map close");
// 以下做一些清理工作,例如断开和HDFS的连接
}
}
@Test
public void map() throws Exception {
source = env.readTextFile("D:\\IdeaProjects\\bigdata\\flink\\src\\main\\resources\\sensort.txt");
SingleOutputStreamOperator<SensorReading> map = source.map(new MyMapFunction());
SingleOutputStreamOperator<Tuple2<Integer, String>> map1 = map.map(new MyMapRichFunction());
map1.print();
env.execute();
//my map open
//my map open
//my map open
//my map open
//3> (2,sensor_1)
//2> (1,sensor_1)
//1> (0,sensor_7)
//4> (3,sensor_1)
//my map close
//3> (2,sensor_6)
//my map close
//my map close
//1> (0,sensor_10)
//my map close
}
富函数使用场景
1.获取第三方框架的连接 关闭连接
2.获取上下文环境 进行状态编程