flink窗口连续使用

阅读 133

2022-04-23

1、需求:按照事件时间,key分区,创建滚动窗口求一段时间的和,之后再创建全局窗口,求和的最大值。

2、代码实现

import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.time.Duration;
public class Demo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<String> line = env.socketTextStream("localhost", 8888);

        SingleOutputStreamOperator<Tuple3<Long, String, Integer>> mapStream = line.assignTimestampsAndWatermarks(WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner(
                new SerializableTimestampAssigner<String>() {
                    @Override
                    public long extractTimestamp(String value, long l) {
                        return Long.parseLong(value.split(",")[0]);
                    }
                }
        )).map(new MapFunction<String, Tuple3<Long, String, Integer>>() {

            @Override
            public Tuple3<Long, String, Integer> map(String s) throws Exception {
                String[] fields = s.split(",");
                return new Tuple3<Long, String, Integer>(Long.parseLong(fields[0]), fields[1], Integer.parseInt(fields[2]));
            }
        });

        //创建一个窗口,获取当前窗口内每个key的sum值
        SingleOutputStreamOperator<Tuple4<Long, Long, String, Integer>> windowStream = mapStream.keyBy(e -> e.f1).window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .process(new ProcessWindowFunction<Tuple3<Long, String, Integer>, Tuple4<Long, Long, String, Integer>, String, TimeWindow>() {
                    @Override
                    public void process(String s, ProcessWindowFunction<Tuple3<Long, String, Integer>, Tuple4<Long, Long, String, Integer>, String, TimeWindow>.Context context, Iterable<Tuple3<Long, String, Integer>> elements, Collector<Tuple4<Long, Long, String, Integer>> out) throws Exception {
                        int sum = 0;
                        for (Tuple3<Long, String, Integer> element : elements) {
                            sum += element.f2;
                        }
                        out.collect(new Tuple4<Long, Long, String, Integer>(context.window().getStart(), context.window().getEnd(), s, sum));
                    }
                });

        windowStream.print("sum=>");

        windowStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(10)))
                .max(3).print("max=>");


        env.execute();
    }
}

3、数据测试
/**

  • 输入数据:
  • 100000,a,1
  • 100001,b,1
  • 100002,c,1
  • 100003,a,10
  • 100004,b,5
  • 100005,c,1
  • 第一个窗口触发
  • 105005,d,2
  • 105006,a,10
  • 105007,b,4
  • 105008,c,3
  • 120000,a,1
  • 第二个窗口和max窗口触发
  • 输出数据:
  • sum=>:4> (100000,105000,c,2)
  • sum=>:6> (100000,105000,a,11)
  • sum=>:2> (100000,105000,b,6)
  • sum=>:5> (105000,110000,d,2)
  • sum=>:4> (105000,110000,c,3)
  • sum=>:2> (105000,110000,b,4)
  • sum=>:6> (105000,110000,a,10)
  • max=>:4> (100000,105000,a,11)
    */

精彩评论(0)

0 0 举报