Flink ProcessFunction

阅读 39

2022-04-06

Flink ProcessFunction

文章目录

之前使用的转换算子是无法访问事件的时间戳信息和水位线信息的,在某些应用场景下,我们需要这些信息。因此,DataStream API提供了一系列的Low-Level转换算子。可以访问时间戳、watermark以及注册定时事件。还可以输出特定的一些事件(比如将筛选数据、超时事件等)。ProcessFunction用来构建事件驱动的应用以及实现自定义的业务逻辑,之前使用的window函数和转换算子无法实现。比如,FlinkSQL就是使用ProcessFunction实现的。

Flink提供了8个Process Function:

  • ProcessFunction
  • KeyedProcessFunction
  • CoProcessFunction
  • ProcessJoinFunction
  • BroadcastProcessFunction
  • KeyedBroadcastProcessFunction
  • ProcessWindowFunction
  • ProcessAllWindowFunction

其中最常用的是KeyedProcessFunction。

一、 KeyedProcessFunction

KeyedProcessFunction用来操作KeyedStream。KeyedProcessFunction会处理流的每一个元素,输出为0个、1个或者多个元素。所有的Process Function都继承自RichFunction接口,所以都有open()close()getRuntimeContext()等方法。而KeyedProcessFunction<K, I, O>还额外提供了两个方法:

  • processElement(I value, Context ctx, Collector<O> out),流中的每一个元素都会调用这个方法,调用结果将会放在Collector数据类型中输出。Context可以访问元素的时间戳,元素的 key ,以及TimerService 时间服务。 Context 还可以将结果输出到别的流(side outputs)。
  • onTimer(long timestamp, OnTimerContext ctx, Collector<O> out),是一个回调函数。当之前注册的定时器触发时调用。参数timestamp 为定时器所设定的触发的时间戳。Collector 为输出结果的集合。OnTimerContext和processElement的Context 参数一样,提供了上下文的一些信息,例如定时器触发的时间信息(事件时间或者处理时间)。

测试代码:

设置一次获取数据后第5s给出提示信息的定时器

package com.root.process;

import com.root.SensorReading;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

/**
 * @author Kewei
 * @Date 2022/3/7 9:27
 */

public class ProcessTest1_KeyedProcessFunction {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStreamSource<String> inputStream = env.socketTextStream("localhost", 7777);

        SingleOutputStreamOperator<SensorReading> dataStream = inputStream.map(line -> {
            String[] fields = line.split(",");
            return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
        });

        SingleOutputStreamOperator<Long> resultStream = dataStream.keyBy("id").process(new MyKeyedProcess());

        resultStream.print();

        env.execute();
    }

    public static class MyKeyedProcess extends KeyedProcessFunction<Tuple, SensorReading, Long>{
        ValueState<Long> tsTemp;

        @Override
        public void open(Configuration parameters) throws Exception {
            tsTemp = getRuntimeContext().getState(new ValueStateDescriptor<Long>("ts",Long.class));
        }

        @Override
        public void close() throws Exception {
            tsTemp.clear();
        }

        @Override
        public void processElement(SensorReading value, KeyedProcessFunction<Tuple, SensorReading, Long>.Context context, Collector<Long> out) throws Exception {
            out.collect((long) value.getId().length());

            context.getCurrentKey();
            context.timestamp();

            context.timerService().currentProcessingTime();
            context.timerService().currentWatermark();

            context.timerService().registerProcessingTimeTimer(context.timerService().currentProcessingTime() + 5000L);
            tsTemp.update(context.timerService().currentProcessingTime() + 1000L);

            context.timerService().deleteProcessingTimeTimer(tsTemp.value());
        }

        @Override
        public void onTimer(long timestamp, KeyedProcessFunction<Tuple, SensorReading, Long>.OnTimerContext ctx, Collector<Long> out) throws Exception {
            System.out.println(timestamp + " 定时器触发");
            ctx.getCurrentKey();

            ctx.timeDomain();
        }
    }
}

输出

8
1646622646820 定时器触发

二、TimerService和定时器(Timers)

Context 和OnTimerContext 所持有的TimerService 对象拥有以下方法:

  • long currentProcessingTime() 返回当前处理时间
  • long currentWatermark() 返回当前watermark 的时间戳
  • void registerProcessingTimeTimer( long timestamp) 会注册当前key的processing time的定时器。当processing time 到达定时时间时,触发timer
  • void registerEventTimeTimer(long timestamp) 会注册当前key 的event time 定时器。当Watermark水位线大于等于定时器注册的时间时,触发定时器执行回调函数。
  • void deleteProcessingTimeTimer(long timestamp) 删除之前注册处理时间定时器。如果没有这个时间戳的定时器,则不执行。
  • void deleteEventTimeTimer(long timestamp) 删除之前注册的事件时间定时器,如果没有此时间戳的定时器,则不执行。

当定时器timer触发时,会调用回调函数onTimer()。注意定时器timer只能在keyedStreams上面使用。

测试代码:

监控温度传感器的温度值,如果温度值在10秒钟之内连续上升,则报警。

package com.root.process;

import com.root.SensorReading;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

/**
 * @author Kewei
 * @Date 2022/3/7 9:51
 */

public class ProcessTest2_ApplicationCase {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStreamSource<String> inputStream = env.socketTextStream("localhost", 7777);
        SingleOutputStreamOperator<SensorReading> dataStream = inputStream.map(line -> {
            String[] fields = line.split(",");
            return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
        });

        // KeyBy之前使用“id”可以分组,这这里无法运行,必须使用方法引用SensorReading::getId
        // 使用process可以做获取当前的id,运行时间戳、以及水位watermark
        dataStream.keyBy(SensorReading::getId)
                .process(new MyKeyedProcess(Time.seconds(10).toMilliseconds()))
                .print();

        env.execute();
    }

    public static class MyKeyedProcess extends KeyedProcessFunction<String, SensorReading,String>{
        // 设置预警的时间
        private Long interval;
        public MyKeyedProcess(Long interval){
            this.interval = interval;
        }

        // 设置值状态,保存上一次的温度
        ValueState<Double> lastTemp;

        // 设置值状态,保存最近一次预警的时间
        ValueState<Long> recentTimerTimeStamp;

        // 设置预警时调用的函数
        @Override
        public void onTimer(long timestamp, KeyedProcessFunction<String, SensorReading, String>.OnTimerContext ctx, Collector<String> out) throws Exception {
            // getCurrentKey()获取当前key,out.collect设置输出内容
            out.collect("传感器 :"+ctx.getCurrentKey()+"温度值连续"+interval+"ms上升");

            // 预警之后清楚最近一次预警的时间
            recentTimerTimeStamp.clear();
        }

        @Override
        public void open(Configuration parameters) throws Exception {
            // 初始化两个状态
            lastTemp = getRuntimeContext().getState(new ValueStateDescriptor<Double>("double",Double.class));
            recentTimerTimeStamp = getRuntimeContext().getState(new ValueStateDescriptor<Long>("long",Long.class));
        }

        @Override
        public void close() throws Exception {
            // 清除两个状态的值
            lastTemp.clear();
            recentTimerTimeStamp.clear();
        }

        @Override
        public void processElement(SensorReading value, KeyedProcessFunction<String, SensorReading, String>.Context ctx, Collector<String> out) throws Exception {
            // 获取当前时间的温度
            Double curTemp = value.getTemperature();
            // 获取前一次的温度
            Double lastTemper = lastTemp.value();

            // 判断前一次的温度是否存在,若不存在使用当前温度,若存在使用本身,便于之后的比较
            lastTemper = lastTemper==null?curTemp:lastTemper;

            // 获取最近一次预警的时间
            Long timerTime = recentTimerTimeStamp.value();

            // 判断当前温度是否大于前一次温度,并且最近一次预警的时间为空(说明还没有预警)
            // 若都满足,设置一个预警时间,注册一个触发器,并更新最近一次预警时间
            if (curTemp>lastTemper && null == timerTime){
                long warningTimer = ctx.timerService().currentProcessingTime() + interval;
                ctx.timerService().registerProcessingTimeTimer(warningTimer);
                recentTimerTimeStamp.update(warningTimer);
            }

            // 若当前温度小于等于前一次温度,以及前一次预警时间不为空(说明已经预警了)
            // 若都满足,就是说温度下降了,需要把触发器删除,以及清除预警时间
            if (curTemp <= lastTemper && timerTime != null){
                ctx.timerService().deleteProcessingTimeTimer(timerTime);
                recentTimerTimeStamp.clear();
            }

            // 更新上次温度
            lastTemp.update(curTemp);
        }
    }
}

输出

传感器 :sensor_1温度值连续10000ms上升

三、侧输出流(SideOutput)

一个数据可以被多个window包含,只有其不被window包含的时候(包含该数据的所有window都关闭之后),才会被丢到侧输出流。

如果一个数据被丢到侧输出流,那么所有包含该数据的window都由于已经超过了“允许的迟到的时间”而关闭了,进而新来的迟到数据只能被丢到测输出流


  • 大部分的DataStream API 的算子的输出是单一输出,也就是某种数据类型的流。除了split 算子,可以将一条流分成多条流,这些流的数据类型也都相同。
  • processfunction 的side outputs 功能可以产生多条流,并且这些流的数据类型可以不一样。
  • 一个side output 可以定义为OutputTag[X]对象,X 是输出流的数据类型。
  • processfunction 可以通过Context 对象发射一个事件到一个或者多个side outputs。

测试代码:

将温度≥30放入高温输出,反正放入低温测流输出。

package com.root.process;

import com.root.SensorReading;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

/**
 * @author Kewei
 * @Date 2022/3/7 10:47
 */

public class ProcessTest3_SideOutCase {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStreamSource<String> inputStream = env.socketTextStream("localhost", 7777);
        SingleOutputStreamOperator<SensorReading> dataStream = inputStream.map(line -> {
            String[] fields = line.split(",");
            return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
        });

        // 定义一个OutputTag,用来表示侧输出流低温流
        // An OutputTag must always be an anonymous inner class
        // OutputTag必须始终是匿名的内部类
        // so that Flink can derive a TypeInformation for the generic type parameter.
        // 这样Flink就可以导出泛型类型参数的类型信息
        OutputTag<SensorReading> outputTag = new OutputTag<SensorReading>("lowTemp") {
        };

        SingleOutputStreamOperator<SensorReading> resultStream = dataStream.keyBy(SensorReading::getId)
                .process(new ProcessFunction<SensorReading, SensorReading>() {
                    @Override
                    public void processElement(SensorReading value, ProcessFunction<SensorReading, SensorReading>.Context ctx, Collector<SensorReading> out) throws Exception {
                        Double curTemp = value.getTemperature();

                        // 若温度大于30,正常输出,否则将数据写道测流中,context.output
                        if (curTemp > 30.0) {
                            out.collect(value);
                        } else {
                            ctx.output(outputTag, value);
                        }
                    }
                });

        resultStream.print("high-temp");

        // 获取测流的数据
        resultStream.getSideOutput(outputTag).print("low-temp");

        env.execute();
    }
}

输出

high-temp> SensorReading{id='sensor_1', timestamp=1547718209, temperature=32.8}
low-temp> SensorReading{id='sensor_1', timestamp=1547718209, temperature=29.5}

四、CoProcessFunction

  • 对于两条输入流,DataStream API 提供了CoProcessFunction 这样的low-level操作。CoProcessFunction 提供了操作每一个输入流的方法: processElement1()processElement2()
  • 类似于ProcessFunction,这两种方法都通过Context 对象来调用。这个Context对象可以访问事件数据,定时器时间戳,TimerService,以及side outputs。
  • CoProcessFunction 也提供了onTimer()回调函数

精彩评论(0)

0 0 举报