文章目录
- KafkaTopicPartitionState
- code
- 介绍
KafkaTopicPartitionState
code
import org.apache.flink.annotation.Internal;
/**
* 这是一个非常关键的关于 flink -kafka 进行消息传递的数据结构,
* The state that the Flink Kafka Consumer holds for each Kafka partition.
* Includes the Kafka descriptor for partitions.
*
* <p>This class describes the most basic state (only the offset), subclasses
* define more elaborate state, containing current watermarks and timestamp
* extractors.
*
* @param <KPH> The type of the Kafka partition descriptor, which varies across Kafka versions.
*/
@Internal
public class KafkaTopicPartitionState<KPH> {
// ------------------------------------------------------------------------
/** The Flink description of a Kafka partition. */
private final KafkaTopicPartition partition; //描述Kafka 主题相关的
/** The Kafka description of a Kafka partition (varies across different Kafka versions). */
private final KPH kafkaPartitionHandle; //KPH 泛型-- 描述 partition分区相关, --由于不同的kafka版本,所以是 泛型
/** The offset within the Kafka partition that we already processed. */
private volatile long offset; //offset 偏移量 --已经提交的offset
/** The offset of the Kafka partition that has been committed. */
private volatile long committedOffset; //还没有提交的offset
// ------------------------------------------------------------------------
//构造函数
public KafkaTopicPartitionState(KafkaTopicPartition partition, KPH kafkaPartitionHandle) {
this.partition = partition;
this.kafkaPartitionHandle = kafkaPartitionHandle; //handle
this.offset = KafkaTopicPartitionStateSentinel.OFFSET_NOT_SET; //不设置offset
this.committedOffset = KafkaTopicPartitionStateSentinel.OFFSET_NOT_SET; //不设置offset
}
// ------------------------------------------------------------------------
/**
* Gets Flink's descriptor for the Kafka Partition.
* @return The Flink partition descriptor.
*/
public final KafkaTopicPartition getKafkaTopicPartition() {
return partition;
}
/**
* Gets Kafka's descriptor for the Kafka Partition.
* @return The Kafka partition descriptor.
*/
public final KPH getKafkaPartitionHandle() {
return kafkaPartitionHandle;
}
public final String getTopic() {
return partition.getTopic();
}
public final int getPartition() {
return partition.getPartition();
}
/**
* The current offset in the partition. This refers to the offset last element that
* we retrieved and emitted successfully. It is the offset that should be stored in
* a checkpoint.
*/
public final long getOffset() {
return offset;
}
public final void setOffset(long offset) {
this.offset = offset;
}
public final boolean isOffsetDefined() {
return offset != KafkaTopicPartitionStateSentinel.OFFSET_NOT_SET;
}
public final void setCommittedOffset(long offset) {
this.committedOffset = offset;
}
public final long getCommittedOffset() {
return committedOffset;
}
// ------------------------------------------------------------------------
@Override
public String toString() {
return "Partition: " + partition + ", KafkaPartitionHandle=" + kafkaPartitionHandle
+ ", offset=" + (isOffsetDefined() ? String.valueOf(offset) : "(not set)");
}
}
介绍
KafkaTopicPartitionState是一个非常核心的数据结构,基于内部的4个基本属性,Flink Kafka Consumer维护了topic、partition、已消费offset、待提交offset的关联关系。Flink Kafka Consumer的容错机制依赖了这些数据。
最重要的便是这四个成员变量,剩余部分就是构造方法, get/set/tostring()方法,由于代码量不多,就直接复制过来了