0
点赞
收藏
分享

微信扫一扫

【Flink】【第五章 流处理API】UDF函数

phpworkerman 2022-01-13 阅读 97
flink

实现UDF函数——更细粒度的控制流

函数类(Function Classes)

我们可以先分清一下概念:

算子:
将flink中的addSource、Transform算子、addSink都为算子

函数:

  • 算子中的参数;
  • 也就是各种各样的Function就是函数,这些函数基本上都是接口,也有抽象类;
  • 函数类的调用是针对DataStream中的每一个数据的,也就是每个元素都要调用一下。
  • Flink暴露了所有udf函数的接口(实现方式为接口或者抽象类)。例如MapFunction, FilterFunction, ProcessFunction等等,他们都是Function接口的子接口
  • Function接口实现了序列化。

在这里插入图片描述

匿名函数(Lambda Functions)

就是Java8 Lambda表达式写法

DataStream<String> tweets = env.readTextFile("INPUT_FILE");

DataStream<String> flinkTweets = tweets.filter( tweet -> tweet.contains("flink") );

富函数(Rich Functions)☆

富函数介绍

富函数是抽象类,继承了AbstractRichFunction,实现了常规函数接口

public abstract class RichFlatMapFunction<IN, OUT> 
extends AbstractRichFunction implements FlatMapFunction<IN, OUT>
  • 富函数是DataStream API提供的一个函数类的接口;
  • ProcessFunction API是Flink中最底层的API,功能最强大,因为ProcessFunction 都是RichFunction的实现类,所以RichFunction能干的事情,ProcessFunction都能做。

RichFuntion接口是Function接口的子接口, Flink的所有函数都有”富函数”版本,比如:

  • MapFunction =>RichMapFunction
  • RichFlatMapFunction
  • RichFilterFunction

  • 这些富函数一方面继承了AbstractRichFunction,一方面实现了普通的Function接口
    在这里插入图片描述

富函数的功能

富函数比常规函数多了两个功能

  • 获取运行环境的上下文
  • 生命周期方法

注意: 在用普通Function的时候,都是实现接口,但是在编写富函数的时候,用的都是继承,RichFunction接口下面都是抽象类
在这里插入图片描述

Rich Function抽象类所具有的典型的生命周期方法

在这里插入图片描述

  • open()方法:rich function的初始化方法,当一个算子例如map或者filter被调用之前open()会被调用。
  • close()方法:是生命周期中的最后一个调用的方法,做一些清理工作。
  • getRuntimeContext()方法:获取函数的运行时上下文RuntimeContext,从中可以获取函数的信息:例如函数执行的并行度,任务的名字,以及状态编程(算子的状态)

富函数的生命周期方法提供了一种全局的视角,而非局限于单个元素,生命周期方法是对整个DataStream调用一次 [更正:生命周期方法是基于并行度的,并非整个dataStream只调用一次,可以参考 [第9章9.1.3例子中的报错],而富函数中的执行函数是对每一个元素调用一次的,因此可以提供一些建立在DataStream上的功能,比如获取第三方框架的连接

public static class MyMapFunction extends RichMapFunction<SensorReading, Tuple2<Integer, String>> {
    @Override
    public Tuple2<Integer, String> map(SensorReading value) throws Exception {
        return new Tuple2<>(getRuntimeContext().getIndexOfThisSubtask(), value.getId());
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        System.out.println("my map open");
        // 以下可以做一些初始化工作,例如建立一个和HDFS的连接
    }

    @Override
    public void close() throws Exception {
        System.out.println("my map close");
        // 以下做一些清理工作,例如断开和HDFS的连接
    }
}
    @Test
    public void map() throws Exception {
        source = env.readTextFile("D:\\IdeaProjects\\bigdata\\flink\\src\\main\\resources\\sensort.txt");
        SingleOutputStreamOperator<SensorReading> map = source.map(new MyMapFunction());

        SingleOutputStreamOperator<Tuple2<Integer, String>> map1 = map.map(new MyMapRichFunction());

        map1.print();

        env.execute();

        //my map open
        //my map open
        //my map open
        //my map open
        //3> (2,sensor_1)
        //2> (1,sensor_1)
        //1> (0,sensor_7)
        //4> (3,sensor_1)
        //my map close
        //3> (2,sensor_6)
        //my map close
        //my map close
        //1> (0,sensor_10)
        //my map close

        
    }

富函数使用场景

1.获取第三方框架的连接 关闭连接
2.获取上下文环境 进行状态编程

举报

相关推荐

0 条评论