0
点赞
收藏
分享

微信扫一扫

阿里云实时计算平台Flink报错汇总

忆北文学摄影爱好员 2022-04-22 阅读 83

一、报错问题

1.2022.04.14记录

全托管实时计算平台

所有cdc模式都不支持窗口,如果加上cdc模式会报错:

org.apache.flink.table.api.TableException: StreamPhysicalWindowAggregate doesn't support consuming update and delete changes which is produced by node TableSourceScan(table=[[vvp, dwd, holo_dwd_0_wangshuaizun_test_source_test]], fields=[id2, id5]) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.createNewNode(FlinkChangelogModeInferenceProgram.scala:391) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:312) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.org$apache$flink$table$planner$plan$optimize$program$FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$visitChild(FlinkChangelogModeInferenceProgram.scala:350) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$anonfun$3.apply(FlinkChangelogModeInferenceProgram.scala:339) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$anonfun$3.apply(FlinkChangelogModeInferenceProgram.scala:338) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.immutable.Range.foreach(Range.scala:160) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visitChildren(FlinkChangelogModeInferenceProgram.scala:338) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:289) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.org$apache$flink$table$planner$plan$optimize$program$FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$visitChild(FlinkChangelogModeInferenceProgram.scala:350) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$anonfun$3.apply(FlinkChangelogModeInferenceProgram.scala:339) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$anonfun$3.apply(FlinkChangelogModeInferenceProgram.scala:338) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.immutable.Range.foreach(Range.scala:160) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visitChildren(FlinkChangelogModeInferenceProgram.scala:338) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:289) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.org$apache$flink$table$planner$plan$optimize$program$FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$visitChild(FlinkChangelogModeInferenceProgram.scala:350) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$anonfun$3.apply(FlinkChangelogModeInferenceProgram.scala:339) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$anonfun$3.apply(FlinkChangelogModeInferenceProgram.scala:338) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.immutable.Range.foreach(Range.scala:160) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visitChildren(FlinkChangelogModeInferenceProgram.scala:338) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:289) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.org$apache$flink$

 

二、问题交流:

1、不停⽌Flink作业的情况下,可以动态修改Flink SQL代码并部署上线?

⽬前没有,需要和客户沟通这部分的需求,Flink CEP在做这个能⼒,另外动态修改并⾏度是不是需

求?

2、flink sql必须重启才能⽣效吗?

和问题1⼀致。

3、打包发布有没有预发这种⽅案?有没有灰度测试的⽅案?

预发和测试上推荐⽤户使⽤SavePoint的⽅式进⾏,后边会规划上线如下功能:⽤户作业A的SP可以

被B使⽤,双跑验证。

4、flink经常开发重启,会影响服务启动过程3分钟左右的时间,有没有什么好的解决⽅案?

⼀⽅⾯我们在社区推动Native SP,这个功能在1.16会ready,云上会在下半年,另⼀⽅⾯我们在优化

作业启停链路的优化

5、flink管理页⾯经常崩溃、启动经常出问题,给我们⼀个推荐的上线时间,响应速度较快的时间区

间。另外就是如果等⼯单,基本都太慢。

这部分应该是通知的问题,基本上在我们的发布时会出现。

6、之前有个任务出现state写⼊太⼤会失败的问题,建议的⽅案是设置ttl(但是不能⼩于统计周

期),增加pod让单个pod的state变⼩,但是如果遇上需要排序的场景单个pod量就是⾮常⼤的情

况,怎么办?

优化⽅案】

对于作业配置Gemini statebackend(系统默认)的情况,有如下四种⽅式进⾏调优

a. 清理state

结合业务逻辑,state层⾯配置ttl, 这个是state负责数据清理的,数据进到state之后,经过ttl设

置的时长就⽆效了,配置参数可参考: https://help.aliyun.com/document_detail/

313172.html#section-ukm-5ky-kdo

DataStream作业,⽤户可以⾃⾏在代码中清理state,将不⽤的数据⾃⼰delete掉

b. 对state进⾏压缩

VVR 3.x 版本或VVR4.x版本:可以配置 state.backend.gemini.page.flush.local.compression:

Lz4 参数,对本地state进⾏压缩,能降低本地磁盘空间

c. 配置state持久化到DFS(VVP中即为OSS)

VVR-4.x版本 (4.0.11及以上)( 推荐 ): 配置 state.backend.gemini.file.cache.type:

LIMITED 参数,会校验本地盘的剩余空间,当剩余空间不⾜2GB(默认值)时,会将超⽤空间的

⽂件evict到远程DFS或者直接写⼊到远程DFS。相当于本地盘是作为⼀个local file cache。如果

想要调整剩余空间的阈值,可以配置 state.backend.gemini. file.cache.preserved-space,默

认值是2GB。

VVR-3.x 版本 (3.0.3 及以上):配置 state.backend.gemini.file.cache.type: LIMITED 参数,

该参数⽣效后,会对每个slot的state占⽤设置18GB(默认值)的state限制,超过 18G数据会

evict到远程DFS,下次读取该⽂件内容的时候,会直接从DFS读取。由于该配置仅针对单个

slot,⽽⽬前单个pod的本次磁盘限制是20GB,所以对于slot数⽬⼤于1的情况下,还需要配置

state.backend.gemini.file.cache.capacity 参数。例如对于⼀个配置成4个slot的TM,如果我们

希望整体state的空间占⽤在15GB,那么就需要将该数值配置成 3750mb (这样四个slot所占的整

体空间是 4 * 3750mb = 15GB,不要配置⾥⾯带⼩数点,会解析出问题)

说明:以上参数在 ⾼级配置 -> 更多 Flink 配置

中配置即可

调⼤并发数,从⽽扩⼤Pod数,减少单个Pod的state⼤⼩

在 ⾼级配置 中调⼤ Task Managers

对于作业配置rocksdb statebackend的情况,有如下两种⽅式进⾏调优:

清理state

结合业务逻辑,state层⾯配置ttl, 这个是state负责数据清理的,数据进到state之后,经过ttl设

置的时长就⽆效了,配置参数可参考: https://help.aliyun.com/document_detail/

313172.html#section-ukm-5ky-kdo

DataStream作业,⽤户可以⾃⾏在代码中清理state,将不⽤的数据⾃⼰delete掉

调⼤并发数,从⽽扩⼤ Pod 数,减少单个Pod的state⼤⼩

在 ⾼级配置 中调⼤ Task Managers

数量

7、flink sql⾥⾯cdc模式读取mysql数据,两个cdc源进⾏join的时候,如果在⼀个窗⼝内等不到另⼀

个的数据会怎么处理?会去全量读mysql数据吗?

如果等不到就为null了吧,不会mysql去读了,回先发null,等能关联上了再回撤null值下发关联上的

值,这个和source是啥没关系,双流join的语义就是这样的。

后续Action:

需要明确后续快速升级路径和服务。

需要对实时特征⼯程进⾏进⼀步交流。

举报

相关推荐

0 条评论