文章目录
一、window 概念
Keyed Windows
stream
       .keyBy(...)               <-  keyed versus non-keyed windows
       .window(...)              <-  required: "assigner"
      [.trigger(...)]            <-  optional: "trigger" (else default trigger)
      [.evictor(...)]            <-  optional: "evictor" (else no evictor)
      [.allowedLateness(...)]    <-  optional: "lateness" (else zero)
      [.sideOutputLateData(...)] <-  optional: "output tag" (else no side output for late data)
       .reduce/aggregate/apply()      <-  required: "function"
      [.getSideOutput(...)]      <-  optional: "output tag"
 
Non-Keyed Windows
stream
       .windowAll(...)           <-  required: "assigner"
      [.trigger(...)]            <-  optional: "trigger" (else default trigger)
      [.evictor(...)]            <-  optional: "evictor" (else no evictor)
      [.allowedLateness(...)]    <-  optional: "lateness" (else zero)
      [.sideOutputLateData(...)] <-  optional: "output tag" (else no side output for late data)
       .reduce/aggregate/apply()      <-  required: "function"
      [.getSideOutput(...)]      <-  optional: "output tag"
 
一般真实的流都是无界的,怎样处理无界的数据?
- 可以把无限的数据流进行切分,得到有限的数据集进行处理 —— 也
就是得到有界流 - 窗口(window)就是将无限流切割为有限流的一种方式,它会将流
数据分发到有限大小的桶(bucket)中进行分析 
二、 时间窗口(Time Window)
官方文档
1)滚动窗口(Tumbling Windows)
【特点】
- 将数据依据固定的窗口长度对数据进行切分
 - 时间对齐,窗口长度固定,没有重叠
 
【示例代码】
val input: DataStream[T] = ...
// tumbling event-time windows
input
    .keyBy(<key selector>)
    .window(TumblingEventTimeWindows.of(Time.seconds(5)))
    .<windowed transformation>(<window function>)
// tumbling processing-time windows
input
    .keyBy(<key selector>)
    .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
    .<windowed transformation>(<window function>)
// daily tumbling event-time windows offset by -8 hours.
input
    .keyBy(<key selector>)
    .window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
    .<windowed transformation>(<window function>)
 
2)滑动窗口(Sliding Windows)
例如,您可以将大小为10分钟的窗口滑动5分钟。这样,每隔5分钟就会出现一个窗口,其中包含在最后10分钟内到达的事件,如下图所示:
 
 【特点】
- 滑动窗口是固定窗口的更广义的一种形式,滑动窗口由固定的窗口
长度和滑动间隔组成 - 窗口长度固定,可以有重叠
 
【示例代码】
val input: DataStream[T] = ...
// sliding event-time windows
input
    .keyBy(<key selector>)
    .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
    .<windowed transformation>(<window function>)
// sliding processing-time windows
input
    .keyBy(<key selector>)
    .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
    .<windowed transformation>(<window function>)
// sliding processing-time windows offset by -8 hours
input
    .keyBy(<key selector>)
    .window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8)))
    .<windowed transformation>(<window function>)
 
3)会话窗口(Session Windows)
【特点】
- 由一系列事件组合一个指定时间长度的 timeout 间隙组成,也就是
一段时间没有接收到新数据就会生成新的窗口 - 时间无对齐
 - 窗口长度不固定,也不会重叠
 
【示例代码】
val input: DataStream[T] = ...
// event-time session windows with static gap
input
    .keyBy(<key selector>)
    .window(EventTimeSessionWindows.withGap(Time.minutes(10)))
    .<windowed transformation>(<window function>)
// event-time session windows with dynamic gap
input
    .keyBy(<key selector>)
    .window(EventTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor[String] {
      override def extract(element: String): Long = {
        // determine and return session gap
      }
    }))
    .<windowed transformation>(<window function>)
// processing-time session windows with static gap
input
    .keyBy(<key selector>)
    .window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
    .<windowed transformation>(<window function>)
// processing-time session windows with dynamic gap
input
    .keyBy(<key selector>)
    .window(DynamicProcessingTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor[String] {
      override def extract(element: String): Long = {
        // determine and return session gap
      }
    }))
    .<windowed transformation>(<window function>)
 
三、window API
- 我们可以用 
.window()来定义一个窗口,然后基于这个 window 去做一些聚
合或者其它处理操作。注意 window () 方法必须在keyBy之后才能用。 - Flink 提供了更加简单的三种类型时间窗口用于定义时
间窗口,也提供了countWindowAll来定义计数窗口。 
四、窗口分配器(window assigner)
1)增量聚合函数(incremental aggregation functions)
- 每条数据到来就进行计算,保持一个简单的状态
 - ReduceFunction
 
val input: DataStream[(String, Long)] = ...
input
    .keyBy(<key selector>)
    .window(<window assigner>)
    .reduce { (v1, v2) => (v1._1, v1._2 + v2._2) }
 
- AggregateFunction
 
val input: DataStream[(String, Long)] = ...
input
    .keyBy(<key selector>)
    .window(<window assigner>)
    .aggregate(new AverageAggregate)
 
2)全窗口函数(full window functions)
- 先把窗口所有数据收集起来,等到计算的时候会遍历所有数据
 - ProcessWindowFunction
 
一个ProcessWindowFunction可以这样定义和使用:
val input: DataStream[(String, Long)] = ...
input
  .keyBy(_._1)
  .window(TumblingEventTimeWindows.of(Time.minutes(5)))
  .process(new MyProcessWindowFunction())
/* ... */
class MyProcessWindowFunction extends ProcessWindowFunction[(String, Long), String, String, TimeWindow] {
  def process(key: String, context: Context, input: Iterable[(String, Long)], out: Collector[String]) = {
    var count = 0L
    for (in <- input) {
      count = count + 1
    }
    out.collect(s"Window ${context.window} count: $count")
  }
}
 
3)其它可选window API
- .trigger() —— 触发器,定义 window 什么时候关闭,触发计算并输出结果
 - .evictor() —— 移除器,定义移除某些数据的逻辑
 - .allowedLateness() —— 允许处理迟到的数据
 - .sideOutputLateData() —— 将迟到的数据放入侧输出流
 - .getSideOutput() —— 获取侧输出流
 
五、Flink 中的时间语义
官方文档
 Flink 明确支持以下三种时间语义:
-  
事件时间(event time): 事件产生的时间,记录的是设备生产(或者存储)事件的时间
 -  
摄取时间(ingestion time): 数据进入Flink的时间,Flink 读取事件时记录的时间
 -  
处理时间(processing time):执行操作算子的本地系统时间,与机器相关
 

上面图片来源:https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/concepts/time/
六、设置 Event Time
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
var env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
 
七、水位线(Watermark)
官方文档
1)为什么需要水位线(Watermark)
2)如何利用Watermark处理乱序数据问题?
- Watermark 是一种衡量 Event Time 进展的机制,可以设定延迟触发;
 - Watermark 是用于处理乱序事件的,而正确的处理乱序事件,通常用
Watermark 机制结合 window 来实现; - 数据流中的 Watermark 用于表示 timestamp 小于 Watermark 的数据,
都已经到达了,因此,window 的执行也是由 Watermark 触发的; - watermark 用来让程序自己平衡延迟和结果正确性。
 
3)watermark 的特点

- watermark 是一条特殊的数据记录
 - watermark 必须单调递增,以确保任务的事件时间时钟在向前推进,而不
是在后退 - watermark 与数据的时间戳相关
 
4)watermark 的传递

5)watermark 策略与应用
1)Watermark 策略简介
使用 Flink API 时需要设置一个同时包含 TimestampAssigner 和 WatermarkGenerator 的 WatermarkStrategy。WatermarkStrategy 工具类中也提供了许多常用的 watermark 策略,并且用户也可以在某些必要场景下构建自己的 watermark 策略。WatermarkStrategy 接口如下:
public interface WatermarkStrategy<T> 
    extends TimestampAssignerSupplier<T>, WatermarkGeneratorSupplier<T>{
    /**
     * 根据策略实例化一个可分配时间戳的 {@link TimestampAssigner}。
     */
    @Override
    TimestampAssigner<T> createTimestampAssigner(TimestampAssignerSupplier.Context context);
    /**
     * 根据策略实例化一个 watermark 生成器。
     */
    @Override
    WatermarkGenerator<T> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context);
}
 
 
【例如】你想要要使用有界无序(bounded-out-of-orderness)watermark 生成器和一个 lambda 表达式作为时间戳分配器,那么可以按照如下方式实现:
WatermarkStrategy
  .forBoundedOutOfOrderness[(Long, String)](Duration.ofSeconds(20))
  .withTimestampAssigner(new SerializableTimestampAssigner[(Long, String)] {
    override def extractTimestamp(element: (Long, String), recordTimestamp: Long): Long = element._1
  })
 
 
2)使用 Watermark 策略应用
WatermarkStrategy 可以在 Flink 应用程序中的两处使用:
- 第一种是直接在数据源上使用
 - 第二种是直接在非数据源的操作之后使用。
 
【示例】仅当无法直接在数据源上设置策略时,才应该使用第二种方式(在任意转换操作之后设置 WatermarkStrategy):
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream: DataStream[MyEvent] = env.readFile(
         myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100,
         FilePathFilter.createDefaultFilter())
val withTimestampsAndWatermarks: DataStream[MyEvent] = stream
        .filter( _.severity == WARNING )
        .assignTimestampsAndWatermarks(<watermark strategy>)
withTimestampsAndWatermarks
        .keyBy( _.getGroup )
        .window(TumblingEventTimeWindows.of(Time.seconds(10)))
        .reduce( (a, b) => a.add(b) )
        .addSink(...)
 
【示例】处理空闲数据源
WatermarkStrategy
  .forBoundedOutOfOrderness[(Long, String)](Duration.ofSeconds(20))
  .withIdleness(Duration.ofMinutes(1))
 
3)使用场景
- 对于排好序的数据,不需要延迟触发,可以只指定时间戳就行了。
 
// 注意时间是毫秒,所以根据时间戳不同,可能需要乘以1000
dataStream.assignAscendingTimestamps(_.timestamp * 1000)
 
- Flink 暴露了 TimestampAssigner 接口供我们实现,使我们可以自定义如
何从事件数据中抽取时间戳和生成watermark。 
// MyAssigner 可以有两种类型,都继承自 TimestampAssigner
dataStream.assignAscendingTimestamps(new MyAssigner())
 
4)TimestampAssigner
1、AssignerWithPeriodicWatermarks
- 周期性的生成 watermark:系统会周期性的将 watermark 插入到流中
 - 默认周期是200毫秒,可以使用 ExecutionConfig.setAutoWatermarkInterval()
方法进行设置 - 升序和前面乱序的处理 BoundedOutOfOrderness ,都是基于周期性
watermark 的。 
2、AssignerWithPunctuatedWatermarks
- 没有时间周期规律,可打断的生成 watermark
 
可以弃用 AssignerWithPeriodicWatermarks 和 AssignerWithPunctuatedWatermarks 了
未完待续~










