一、报错问题
1.2022.04.14记录
全托管实时计算平台
所有cdc模式都不支持窗口,如果加上cdc模式会报错:
org.apache.flink.table.api.TableException: StreamPhysicalWindowAggregate doesn't support consuming update and delete changes which is produced by node TableSourceScan(table=[[vvp, dwd, holo_dwd_0_wangshuaizun_test_source_test]], fields=[id2, id5]) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.createNewNode(FlinkChangelogModeInferenceProgram.scala:391) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:312) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.org$apache$flink$table$planner$plan$optimize$program$FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$visitChild(FlinkChangelogModeInferenceProgram.scala:350) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$anonfun$3.apply(FlinkChangelogModeInferenceProgram.scala:339) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$anonfun$3.apply(FlinkChangelogModeInferenceProgram.scala:338) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.immutable.Range.foreach(Range.scala:160) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visitChildren(FlinkChangelogModeInferenceProgram.scala:338) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:289) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.org$apache$flink$table$planner$plan$optimize$program$FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$visitChild(FlinkChangelogModeInferenceProgram.scala:350) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$anonfun$3.apply(FlinkChangelogModeInferenceProgram.scala:339) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$anonfun$3.apply(FlinkChangelogModeInferenceProgram.scala:338) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.immutable.Range.foreach(Range.scala:160) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visitChildren(FlinkChangelogModeInferenceProgram.scala:338) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:289) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.org$apache$flink$table$planner$plan$optimize$program$FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$visitChild(FlinkChangelogModeInferenceProgram.scala:350) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$anonfun$3.apply(FlinkChangelogModeInferenceProgram.scala:339) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$anonfun$3.apply(FlinkChangelogModeInferenceProgram.scala:338) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.immutable.Range.foreach(Range.scala:160) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visitChildren(FlinkChangelogModeInferenceProgram.scala:338) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:289) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.org$apache$flink$
二、问题交流:
1、不停⽌Flink作业的情况下,可以动态修改Flink SQL代码并部署上线?
⽬前没有,需要和客户沟通这部分的需求,Flink CEP在做这个能⼒,另外动态修改并⾏度是不是需
求?
2、flink sql必须重启才能⽣效吗?
和问题1⼀致。
3、打包发布有没有预发这种⽅案?有没有灰度测试的⽅案?
预发和测试上推荐⽤户使⽤SavePoint的⽅式进⾏,后边会规划上线如下功能:⽤户作业A的SP可以
被B使⽤,双跑验证。
4、flink经常开发重启,会影响服务启动过程3分钟左右的时间,有没有什么好的解决⽅案?
⼀⽅⾯我们在社区推动Native SP,这个功能在1.16会ready,云上会在下半年,另⼀⽅⾯我们在优化
作业启停链路的优化
5、flink管理页⾯经常崩溃、启动经常出问题,给我们⼀个推荐的上线时间,响应速度较快的时间区
间。另外就是如果等⼯单,基本都太慢。
这部分应该是通知的问题,基本上在我们的发布时会出现。
6、之前有个任务出现state写⼊太⼤会失败的问题,建议的⽅案是设置ttl(但是不能⼩于统计周
期),增加pod让单个pod的state变⼩,但是如果遇上需要排序的场景单个pod量就是⾮常⼤的情
况,怎么办?
优化⽅案】
● 对于作业配置Gemini statebackend(系统默认)的情况,有如下四种⽅式进⾏调优
a. 清理state
○ 结合业务逻辑,state层⾯配置ttl, 这个是state负责数据清理的,数据进到state之后,经过ttl设
置的时长就⽆效了,配置参数可参考: https://help.aliyun.com/document_detail/
313172.html#section-ukm-5ky-kdo
○ DataStream作业,⽤户可以⾃⾏在代码中清理state,将不⽤的数据⾃⼰delete掉
b. 对state进⾏压缩
○ VVR 3.x 版本或VVR4.x版本:可以配置 state.backend.gemini.page.flush.local.compression:
Lz4 参数,对本地state进⾏压缩,能降低本地磁盘空间
c. 配置state持久化到DFS(VVP中即为OSS)
○ VVR-4.x版本 (4.0.11及以上)( 推荐 ): 配置 state.backend.gemini.file.cache.type:
LIMITED 参数,会校验本地盘的剩余空间,当剩余空间不⾜2GB(默认值)时,会将超⽤空间的
⽂件evict到远程DFS或者直接写⼊到远程DFS。相当于本地盘是作为⼀个local file cache。如果
想要调整剩余空间的阈值,可以配置 state.backend.gemini. file.cache.preserved-space,默
认值是2GB。
VVR-3.x 版本 (3.0.3 及以上):配置 state.backend.gemini.file.cache.type: LIMITED 参数,
该参数⽣效后,会对每个slot的state占⽤设置18GB(默认值)的state限制,超过 18G数据会
evict到远程DFS,下次读取该⽂件内容的时候,会直接从DFS读取。由于该配置仅针对单个
slot,⽽⽬前单个pod的本次磁盘限制是20GB,所以对于slot数⽬⼤于1的情况下,还需要配置
state.backend.gemini.file.cache.capacity 参数。例如对于⼀个配置成4个slot的TM,如果我们
希望整体state的空间占⽤在15GB,那么就需要将该数值配置成 3750mb (这样四个slot所占的整
体空间是 4 * 3750mb = 15GB,不要配置⾥⾯带⼩数点,会解析出问题)
说明:以上参数在 ⾼级配置 -> 更多 Flink 配置
中配置即可 。
调⼤并发数,从⽽扩⼤Pod数,减少单个Pod的state⼤⼩
在 ⾼级配置 中调⼤ Task Managers 数
量
● 对于作业配置rocksdb statebackend的情况,有如下两种⽅式进⾏调优:
清理state
结合业务逻辑,state层⾯配置ttl, 这个是state负责数据清理的,数据进到state之后,经过ttl设
置的时长就⽆效了,配置参数可参考: https://help.aliyun.com/document_detail/
313172.html#section-ukm-5ky-kdo
DataStream作业,⽤户可以⾃⾏在代码中清理state,将不⽤的数据⾃⼰delete掉
调⼤并发数,从⽽扩⼤ Pod 数,减少单个Pod的state⼤⼩
在 ⾼级配置 中调⼤ Task Managers
数量
7、flink sql⾥⾯cdc模式读取mysql数据,两个cdc源进⾏join的时候,如果在⼀个窗⼝内等不到另⼀
个的数据会怎么处理?会去全量读mysql数据吗?
如果等不到就为null了吧,不会mysql去读了,回先发null,等能关联上了再回撤null值下发关联上的
值,这个和source是啥没关系,双流join的语义就是这样的。
后续Action:
需要明确后续快速升级路径和服务。
需要对实时特征⼯程进⾏进⼀步交流。