状态(State)是一个重要的概念,它允许Flink在处理流数据时跟踪和存储中间结果。这对于实现复杂的计算逻辑和满足应用需求至关重要。
Working with State
1. 状态类型
Flink支持两种主要类型的状态:
1.1 算子状态(Operator State)
- **定义:**算子状态是与特定算子实例绑定的状态,即一个算子的状态不能被其他算子访问。
- **特性:**与并发的算子实例绑定,假设算子并行度为N,则存在N个对应的算子状态。
1.2 键控状态(Keyed State)
- 定义:键控状态是基于键(Key)的状态,用于存储与每个键相关的数据信息。
- 使用场景:键控状态只能在KeyedStream上使用,通过stream.keyBy(…)获得KeyedStream。
- 分类:
- ValueState:存储单个值的状态。
- ListState:存储列表类型的状态。
- MapState:存储键值对的状态。
- ReducingState:存储经过ReduceFunction计算后的结果。
- AggregatingState:存储经过AggregatingState计算后的结果。
RichFunction 中 RuntimeContext 提供如下方法:
ValueState<T> getState(ValueStateDescriptor<T>)
ReducingState<T> getReducingState(ReducingStateDescriptor<T>)
ListState<T> getListState(ListStateDescriptor<T>)
AggregatingState<IN, OUT> getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT>)
官网例子:
public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
/**
* The ValueState handle. The first field is the count, the second field a running sum.
*/
private transient ValueState<Tuple2<Long, Long>> sum;
@Override
public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception {
// access the state value
Tuple2<Long, Long> currentSum = sum.value();
// update the count
currentSum.f0 += 1;
// add the second field of the input value
currentSum.f1 += input.f1;
// update the state
sum.update(currentSum);
// if the count reaches 2, emit the average and clear the state
if (currentSum.f0 >= 2) {
out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0));
sum.clear();
}
}
@Override
public void open(Configuration config) {
ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
new ValueStateDescriptor<>(
"average", // the state name
TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}), // type information
Tuple2.of(0L, 0L)); // default value of the state, if nothing was set
sum = getRuntimeContext().getState(descriptor);
}
}
// this can be used in a streaming program like this (assuming we have a StreamExecutionEnvironment env)
env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(1L, 4L), Tuple2.of(1L, 2L))
.keyBy(value -> value.f0)
.flatMap(new CountWindowAverage())
.print();
// the printed output will be (1,4) and (1,5)
2. 使用状态
2.1 定义和访问状态
- 使用Flink的Stateful Functions API(如KeyedProcessFunction、ProcessFunction等)来定义和访问状态。
- 通过getRuntimeContext().getState(…)或特定的状态描述符(如ValueStateDescriptor)来获取状态。
2.2 更新状态
- 在处理事件时,根据业务逻辑更新状态。
- 使用状态提供的更新方法(如ValueState.update(T value))来更新状态值。
2.3 使用状态的注意事项
- 确保在适当的时机(如处理完事件后)更新状态。
- 注意状态的清理和释放,以避免内存泄漏。
3. 状态管理
3.1 状态后端(State Backend)
- Flink使用状态后端来存储和管理状态数据。
- 可用的状态后端包括MemoryStateBackend(内存存储,适用于测试和轻量级应用)、FsStateBackend(文件系统存储,适用于生产环境)和RocksDBStateBackend(基于RocksDB的存储,适用于大规模应用)。
3.2 状态持久化
- Flink支持将状态数据持久化到外部存储系统(如HDFS、S3等),以确保在发生故障时能够恢复状态。
- 配置状态后端时,可以指定持久化选项和参数。
3.3 状态清理
- Flink允许为状态设置时间戳限制(TTL),以便在达到指定时间后自动清理过期状态。
- 使用StateTtlConfig类配置状态TTL。
状态有效期 (TTL) 官网例子:
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Duration.ofSeconds(1))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("text state", String.class);
stateDescriptor.enableTimeToLive(ttlConfig);
TTL 配置有以下几个选项: newBuilder 的第一个参数表示数据的有效期,是必选项。
TTL 的更新策略(默认是 OnCreateAndWrite):
- StateTtlConfig.UpdateType.OnCreateAndWrite - 仅在创建和写入时更新
- StateTtlConfig.UpdateType.OnReadAndWrite - 读取时也更新
数据在过期但还未被清理时的可见性配置如下(默认为 NeverReturnExpired):
- StateTtlConfig.StateVisibility.NeverReturnExpired - 不返回过期数据
- StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp - 会返回过期但未清理的数据
过期数据的清理
- 全量快照时进行清理
启用全量快照时进行清理的策略,这可以减少整个快照的大小。当前实现中不会清理本地的状态,但从上次快照恢复时,不会恢复那些已经删除的过期数据。 该策略可以通过 StateTtlConfig 配置进行配置:
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.time.Time;
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Duration.ofSeconds(1))
.cleanupFullSnapshot()
.build();
- 增量数据清理
在状态访问或/和处理时进行。如果某个状态开启了该清理策略,则会在存储后端保留一个所有状态的惰性全局迭代器。 每次触发增量清理时,从迭代器中选择已经过期的数进行清理。
import org.apache.flink.api.common.state.StateTtlConfig;
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Duration.ofSeconds(1))
.cleanupIncrementally(10, true)
.build();
该策略有两个参数。 第一个是每次清理时检查状态的条目数,在每个状态访问时触发。第二个参数表示是否在处理每条记录时触发清理。 Heap backend 默认会检查 5 条状态,并且关闭在每条记录时触发清理。
- RocksDB 压缩时清理
使用 RocksDB state backend,则会启用 Flink 为 RocksDB 定制的压缩过滤器。RocksDB 会周期性的对数据进行合并压缩从而减少存储空间。 Flink 提供的 RocksDB 压缩过滤器会在压缩时过滤掉已经过期的状态数据。
import org.apache.flink.api.common.state.StateTtlConfig;
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Duration.ofSeconds(1))
.cleanupInRocksdbCompactFilter(1000, Duration.ofHours(1))
.build();
Flink 处理一定条数的状态数据后,会使用当前时间戳来检测 RocksDB 中的状态是否已经过期, 你可以通过 StateTtlConfig.newBuilder(…).cleanupInRocksdbCompactFilter(long queryTimeAfterNumEntries) 方法指定处理状态的条数。 时间戳更新的越频繁,状态的清理越及时,但由于压缩会有调用 JNI 的开销,因此会影响整体的压缩性能。 RocksDB backend 的默认后台清理策略会每处理 1000 条数据进行一次。
定期压缩可以加速过期状态条目的清理,特别是对于很少访问的状态条目。 比这个值早的文件将被选取进行压缩,并重新写入与之前相同的 Level 中。 该功能可以确保文件定期通过压缩过滤器压缩。 您可以通过StateTtlConfig.newBuilder(…).cleanupInRocksdbCompactFilter(long queryTimeAfterNumEntries, Duration periodicCompactionTime) 方法设定定期压缩的时间。 定期压缩的时间的默认值是 30 天。 您可以将其设置为 0 以关闭定期压缩或设置一个较小的值以加速过期状态条目的清理,但它将会触发更多压缩。
3.4 故障恢复
- 当Flink任务发生故障时,可以从最近的检查点(Checkpoint)恢复状态和数据。
- 检查点机制是Flink实现容错性的关键组成部分。
Broadcast State
Broadcast State是一种特殊的算子状态(Operator State),它支持将一个流中的元素广播到所有下游任务的使用情形。
1. 定义与用途
Broadcast State用于保持所有子任务状态相同,确保当数据被广播到所有下游并行任务时,这些任务可以访问相同的状态数据。在这些任务中广播状态用于保持所有子任务状态相同。 该状态接下来可在第二个处理记录的数据流中访问。可以设想包含了一系列用于处理其他流中元素规则的低吞吐量数据流,这个例子自然而然地运用了广播状态。 考虑到上述这类使用情形,广播状态和其他算子状态的不同之处在于:
- 它具有 map 格式,
- 它仅在一些特殊的算子中可用。这些算子的输入为一个广播数据流和非广播数据流,
- 这类算子可以拥有不同命名的多个广播状态 。
2. 特性
- 广播性:数据从一个流广播到所有下游并行任务。
- 共享性:所有下游任务共享相同的状态数据。
- map格式:Broadcast State通常具有map的数据结构,允许基于key来存储和访问数据。
3. 使用场景
- 当需要在所有并行任务之间共享相同的配置或元数据时。
- 当需要从一个流广播数据到所有其他流或并行任务时。
4. 实现与访问
- Flink提供了专门的API来定义和访问Broadcast State。
- 通常,你需要创建一个实现了BroadcastProcessFunction或KeyedBroadcastProcessFunction的类,并在其中定义和访问Broadcast State。
public abstract class BroadcastProcessFunction<IN1, IN2, OUT> extends BaseBroadcastProcessFunction {
public abstract void processElement(IN1 value, ReadOnlyContext ctx, Collector<OUT> out) throws Exception;
public abstract void processBroadcastElement(IN2 value, Context ctx, Collector<OUT> out) throws Exception;
}
public abstract class KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> {
public abstract void processElement(IN1 value, ReadOnlyContext ctx, Collector<OUT> out) throws Exception;
public abstract void processBroadcastElement(IN2 value, Context ctx, Collector<OUT> out) throws Exception;
public void onTimer(long timestamp, OnTimerContext ctx, Collector<OUT> out) throws Exception;
}
- 得到广播流的存储状态:ctx.getBroadcastState(MapStateDescriptor<K, V> stateDescriptor)
- 查询元素的时间戳:ctx.timestamp()
- 查询目前的Watermark:ctx.currentWatermark()
- 目前的处理时间(processing time):ctx.currentProcessingTime()
- 产生旁路输出:ctx.output(OutputTag outputTag, X value)
5. 示例
假设一个场景,其中需要从一个流(称之为“配置流”)广播配置数据到多个处理其他数据流的并行任务。
- **定义配置流:**这可以是一个包含配置信息的流。
- 创建BroadcastProcessFunction:
- 实现BroadcastProcessFunction接口。
- 在processBroadcastElement方法中处理来自配置流的元素,并更新Broadcast State。
- 创建DataStream并连接配置流:
- 使用connect方法将主数据流和配置流连接起来。
- 应用BroadcastProcessFunction到连接后的流。
- 在下游任务中访问Broadcast State:
- 在processElement方法中,你可以通过BroadcastProcessFunction的上下文访问Broadcast State。