0
点赞
收藏
分享

微信扫一扫

「事件驱动架构」Kafka中的模式注册表和模式演化


「事件驱动架构」Kafka中的模式注册表和模式演化_人工智能

在这篇文章中,我们将通过Kafka模式注册表来研究Kafka中的模式演化和兼容性类型。通过对兼容性类型的良好理解,我们可以安全地随着时间的推移对模式进行更改,而不会无意中破坏生产者或消费者的利益。

数据集

在我们的“真实世界中的Hadoop开发者”课程中,有一章专门讨论Kafka。在这一章中,我们从Meetup.com直播RSVP数据到Kafka编写我们自己的产品质量,部署就绪,生产者和消费者与Spring Kafka集成。我们将使用来自Meetup.com的RSVP数据流来解释Kafka模式注册表的模式演化和兼容类型。

用例和项目设置

假设Meetup.com决定使用Kafka来分发RSVPs。在这种情况下,producer程序将由Meetup.com管理,如果我想使用Meetup.com生成的RSVPs,我必须连接到Kakfa集群并使用RSVPs。对于我来说,作为一个消费消息的消费者,我首先需要知道的是模式,即RSVP消息的结构。Kafka中消息的典型模式是这样的。

{ "namespace": "com.hirw.kafkaschemaregistry.producer", "type": "record", "name": "Rsvp", "fields": [ { "name": "rsvp_id", "type": "long" }, { "name": "group_name", "type": "string" }, { "name": "event_id", "type": "string" }, { "name": "event_name", "type": "string" }, { "name": "member_id", "type": "int" }, { "name": "member_name", "type": "string" } ]}

该模式列出了消息中的字段以及数据类型。您可以将模式想象为生产者和消费者之间的契约。当producer生成消息时,它将使用此模式来生成消息。因此,在本例中,每个RSVP消息将具有rsvp_id、group_name、event_id、event_name、member_id和member_name。

Producer是一个Spring Kafka项目,使用上述模式向Kafka编写Rsvp消息。因此,所有发送到Kafka主题的消息都将使用上述模式编写,并使用Avro序列化。我们假设producer代码是由meetup.com维护的。Consumer也是Spring Kafka项目,消费写给Kafka的消息。消费者还将使用上面的模式并使用Avro反序列化Rsvp消息。我们维护消费者项目。

问题

Meetup.com采用了这种分发回函的新方式——通过Kafka。生产者和消费者都同意这个模式,一切都很好。认为模式会永远这样是愚蠢的。假设meetup.com觉得提供member_id字段没有价值并删除了它。你认为会发生什么?这会影响到消费者吗?

member_id字段没有默认值,它被认为是必需的列,因此此更改将影响用户。当生产者删除一个必需的字段时,消费者将看到如下错误

导致:org.apache. kafaca .common. Error . serializationexception:反序列化id为63的Avro消息错误

引起的:org.apache.avro。AvroTypeException:发现com.hirw.kafkaschemaregistry.producer.Rsvp,

期待com.hirw.kafkaschemaregistry.producer.Rsvp,缺少必需字段member_id

「事件驱动架构」Kafka中的模式注册表和模式演化_编程语言_02

如果消费者付钱给消费者,他们会很生气,这将是一个代价非常高昂的错误。有办法避免这样的错误吗?幸运的是,有一些方法可以避免Kafka模式注册表和兼容性类型的错误。Kafka模式注册表为我们提供了检查我们对提议的新模式所做更改的方法,并确保我们对模式所做的更改与现有模式兼容。对于我们的模式,哪些更改是允许的,哪些更改是不允许的,这取决于在主题级别定义的兼容性类型。

在Kafka中有几种兼容性类型。现在让我们逐个研究一下。

落后的(BACKWARD)

如果能够使用新模式生成的数据的使用者也能够使用当前模式生成的数据,则认为模式是向后兼容的。

如果没有显式指定兼容性类型,则向后兼容性类型是架构注册表的默认兼容性类型。现在让我们尝试理解当我们从新模式中删除member_id字段时发生了什么。新的模式向后兼容吗?

在新的模式中,我们删除了member_id。假设消费者已经在使用新模式生成的数据—我们需要询问他是否可以使用旧模式生成的数据。答案是肯定的。在新的模式中,member_id不存在,所以如果向消费者提供了member_id数据,也就是在当前模式中,他读取数据没有问题,因为额外的字段是可以的。所以我们可以说新模式是向后兼容的,Kafka模式注册表将允许这个新模式。

但不幸的是,正如我们在演示中看到的那样,这个变化将影响现有的客户。因此,在向后兼容模式中,使用者应该首先进行更改以适应新模式。这意味着,我们需要首先对消费者进行模式更改,然后才能对生产者进行更改。

如果您对使用者有控制权,或者使用者正在驱动对模式的更改,那么这是可以的。在某些情况下,消费者不会乐意为自己做出改变,尤其是如果他们是付费消费者的话。在这种情况下,向后兼容并不是最好的选择。

「事件驱动架构」Kafka中的模式注册表和模式演化_java_03

如果消费者受到更改的影响,为什么架构注册表首先允许更改?

兼容性类型并不保证所有的更改对所有人都是透明的。它为我们提供了一个指导原则,帮助我们理解对于给定的兼容性类型,哪些更改是允许的,哪些更改是不允许的。当允许对兼容类型进行更改时,通过对兼容类型的良好理解,我们将能够更好地了解谁将受到影响,从而能够采取适当的措施。

在我们的当前实例中,允许根据向后兼容类型在新模式中删除member_id。因为根据向后兼容性,能够使用带有新模式的out member_id的RSVP的使用者将能够使用带有member_id的旧模式的RSVP。因此,根据向后兼容性允许进行更改,但这并不意味着如果处理不当,更改不会造成破坏。

在向后兼容模式下,最好在更改模式之前先通知使用者。在我们的例子中,meetup.com应该通知消费者member_id将被删除,并让消费者先删除对member_id的引用,然后改变生产者来删除member_id。这是处理这种特定模式更改的最合适的方法。

在向后兼容模式下,我可以在新模式中添加一个没有默认值的字段吗?

这里,我们试图添加一个名为response的新字段,它实际上是用户的RSVP响应,并且没有默认值。在向后兼容类型中,这种模式更改是否可以接受?你觉得呢?

让我们来看看。通过向后兼容模式,能够使用新模式生成的数据的使用者也能够使用当前模式生成的数据。

因此,假设消费者已经在使用没有默认值的response数据,这意味着它是必需的字段。现在,他可以使用没有响应的当前模式生成的数据吗?答案是否定的,因为使用者希望在数据中得到响应,因为它是必需的字段。因此,建议的模式更改不是向后兼容的,而且模式注册表一开始就不允许这种更改。

这个错误非常清楚,说明“正在注册的模式与以前的模式不兼容”

「事件驱动架构」Kafka中的模式注册表和模式演化_java_04

因此,如果模式与设置的兼容性类型不兼容,模式注册表将拒绝更改,这是为了防止意外更改。

如果我们用默认值更改字段响应会怎样?这种更改是否被认为是向后兼容的?

回答这个问题——“已经在使用响应默认值为“无响应”的数据的消费者是否可以使用当前模式生成的没有响应的数据?”

答案是肯定的,因为当缺少响应字段时,使用带有response的新模式生成的数据的消费者将替换默认值,这是使用当前模式生成数据时的情况。

为了总结,向后兼容性允许删除和添加具有默认值的字段到模式中。与使用默认值添加字段不同,删除字段将影响用户,因此最好首先使用向后兼容类型更新用户。

「事件驱动架构」Kafka中的模式注册表和模式演化_人工智能_05

BACKWARD_TRANSITIVE

向后兼容性类型检查新版本和当前版本,如果需要对所有注册版本进行此检查,则需要使用向后兼容性类型。

「事件驱动架构」Kafka中的模式注册表和模式演化_java_06

向前(FORWARD)

好的,到目前为止,我们已经看到了向后和向后传递兼容性类型。

但是,如果我们不希望模式改变影响当前的消费者呢?也就是说,我们希望避免在从模式中删除member_id时发生的情况。当我们删除member_id时,它突然影响了我们的消费者。如果消费者是付费客户,他们会很生气,这会对你的声誉造成打击。那么,我们如何避免这种情况呢?

我们可以使用向前兼容类型,而不是向后使用默认兼容类型。如果使用当前模式生成的数据的消费者也能够使用新模式生成的数据,则认为模式是转发兼容的。

使用此规则,我们将不能删除新模式中没有默认值的列,因为这会影响使用当前模式的消费者。所以添加字段是可以的,删除可选字段也可以。

「事件驱动架构」Kafka中的模式注册表和模式演化_java_07

如何改变一个主题的兼容性类型?

在指定主题名称的配置上发出PUT请求,并在请求的主体中将兼容性指定为FORWARD。就是这样。让我们发出请求。

「事件驱动架构」Kafka中的模式注册表和模式演化_java_08

现在,当我们检查主题上的配置时,我们会看到兼容性类型现在被设置为FORWARD。既然主题的兼容性类型已更改为FORWARD,我们就不允许删除必需的字段,即没有默认值的列。让我们确认。为什么我们不尝试删除event_id字段,这是一个必需字段。

让我们通过发出REST命令来更新主题的模式。要更新模式,我们将发布一个包含新模式主体的POST。在这个模式中,我们删除了字段event_id。

{“schema”:”{\”type\”:\”record\”,\”name\”:\”Rsvp\”,\”namespace\”:\”com.hirw.kafkaschemaregistry.producer\”,\”fields\”:[{\”name\”:\”rsvp_id\”,\”type\”:\”long\”},{\”name\”:\”group_name\”,\”type\”:\”string\”},{\”name\”:\”event_name\”,\”type\”:\”string\”},{\”name\”:\”member_name\”,\”type\”:\”string\”},{\”name\”:\”venue_name\”,\”type\”:\”string\”,\”default\”:\”Not Available\”}]}”}

请参阅将兼容性类型设置为转发更新实际失败。即使更改了代码、更新了模式并推出了RSVPs,也会收到相同的响应。使用前向兼容性类型,可以保证使用当前模式的消费者能够使用新模式。

「事件驱动架构」Kafka中的模式注册表和模式演化_人工智能_09

FORWARD_TRANSITIVE

仅FORWARD检查带有当前模式的新模式,如果您想检查所有注册的模式,需要将兼容性类型更改为,您猜对了——FORWARD_TRANSITIVE。

「事件驱动架构」Kafka中的模式注册表和模式演化_大数据_10

完整和没有(FULL & NONE)

还有另外3种兼容性类型。如果希望模式向前兼容和向后兼容,那么可以使用FULL。对于完全兼容类型,只允许添加或删除具有默认值的可选字段。使用当前模式完全检查新模式。如果希望根据所有注册的模式检查新模式,可以使用FULL_TRANSITIVE。与其他类型相比,完全和完全传递兼容性类型具有更多的限制性。

最后一种兼容类型是NONE。NONE表示禁用所有兼容类型。这意味着所有更改都是可能的,这是有风险的,通常不会在生产中使用。

谢谢大家关注,转发,点赞。

举报

相关推荐

0 条评论