背景
本文基于 StarRocks 3.1.7
目前在基于Starrocks做一些数据分析的操作(主要是做一些简单的查询),同事遇到了一些并发的问题:
ontent:2024-11-27 07:04:34,048 WARN (starrocks-mysql-nio-pool-214933|3593819) [StmtExecutor.execute():643] execute Exception, sql SELECT distinct(id) FROM `db`.`table` WHERE col1='xxx' AND col2='xxx'
java.util.ConcurrentModificationException: null
at java.util.LinkedHashMap$LinkedHashIterator.nextNode(LinkedHashMap.java:719) ~[?:?]
at java.util.LinkedHashMap$LinkedValueIterator.next(LinkedHashMap.java:746) ~[?:?]
at com.starrocks.sql.optimizer.statistics.StatisticsCalcUtils.deltaRows(StatisticsCalcUtils.java:176) ~[starrocks-fe.jar:?]
at com.starrocks.sql.optimizer.statistics.StatisticsCalcUtils.getTableRowCount(StatisticsCalcUtils.java:114) ~[starrocks-fe.jar:?]
at com.starrocks.sql.optimizer.statistics.StatisticsCalculator.computeOlapScanNode(StatisticsCalculator.java:257) ~[starrocks-fe.jar:?]
at com.starrocks.sql.optimizer.statistics.StatisticsCalculator.visitLogicalOlapScan(StatisticsCalculator.java:225) ~[starrocks-fe.jar:?]
at com.starrocks.sql.optimizer.statistics.StatisticsCalculator.visitLogicalOlapScan(StatisticsCalculator.java:161) ~[starrocks-fe.jar:?]
at com.starrocks.sql.optimizer.operator.logical.LogicalOlapScanOperator.accept(LogicalOlapScanOperator.java:149) ~[starrocks-fe.jar:?]
at com.starrocks.sql.optimizer.statistics.StatisticsCalculator.estimatorStats(StatisticsCalculator.java:177) ~[starrocks-fe.jar:?]
at com.starrocks.sql.optimizer.task.DeriveStatsTask.execute(DeriveStatsTask.java:57) ~[starrocks-fe.jar:?]
at com.starrocks.sql.optimizer.task.SeriallyTaskScheduler.executeTasks(SeriallyTaskScheduler.java:69) ~[starrocks-fe.jar:?]
at com.starrocks.sql.optimizer.Optimizer.memoOptimize(Optimizer.java:595) ~[starrocks-fe.jar:?]
at com.starrocks.sql.optimizer.Optimizer.optimizeByCost(Optimizer.java:201) ~[starrocks-fe.jar:?]
at com.starrocks.sql.optimizer.Optimizer.optimize(Optimizer.java:134) ~[starrocks-fe.jar:?]
at com.starrocks.sql.StatementPlanner.createQueryPlan(StatementPlanner.java:146) ~[starrocks-fe.jar:?]
at com.starrocks.sql.StatementPlanner.planQuery(StatementPlanner.java:121) ~[starrocks-fe.jar:?]
at com.starrocks.sql.StatementPlanner.plan(StatementPlanner.java:92) ~[starrocks-fe.jar:?]
at com.starrocks.sql.StatementPlanner.plan(StatementPlanner.java:61) ~[starrocks-fe.jar:?]
at com.starrocks.qe.StmtExecutor.execute(StmtExecutor.java:456) ~[starrocks-fe.jar:?]
at com.starrocks.qe.ConnectProcessor.handleQuery(ConnectProcessor.java:392) ~[starrocks-fe.jar:?]
at com.starrocks.qe.ConnectProcessor.dispatch(ConnectProcessor.java:506) ~[starrocks-fe.jar:?]
at com.starrocks.qe.ConnectProcessor.processOnce(ConnectProcessor.java:782) ~[starrocks-fe.jar:?]
at com.starrocks.mysql.nio.ReadListener.lambda$handleEvent$0(ReadListener.java:69) ~[starrocks-fe.jar:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[?:?]
at java.lang.Thread.run(Thread.java:829) ~[?:?]
涉及到的表如下:
CREATE TABLE `table` (
`id` bigint(20) NOT NULL ,
`create_date` datetime NOT NULL ,
`col1` varchar(64) NOT NULL ,
`col2` varchar(20) NOT NULL
) ENGINE = OLAP PRIMARY KEY(
`create_date`,
`id`
) COMMENT ""
PARTITION BY date_trunc('month', create_date) DISTRIBUTED BY HASH(`id`)
PROPERTIES (
"replication_num" = "3",
"in_memory" = "false",
"enable_persistent_index" = "true",
"replicated_storage" = "true",
"partition_live_number" = "18",
"compression" = "LZ4"
);
结论
StarRocks 对分区带有TTL的表,会后台启动线程轮询的去删除分区,轮询的间隔受到 dynamic_partition_check_interval_seconds
控制,
然而在查询的时候, Starrocks会做语法解析,以及基于CBO的优化,在这期间会统计涉及到的表的分区信息统计,而此时恰好遇到了后台线程的分区删除,导致了ConcurrentModificationException
并发异常。
目前可以参考这个issue ConcurrentModificationException when query during drop partition,以及 增大 dynamic_partition_check_interval_seconds(目前是10分钟) 这个参数来降低这种冲突的概率。
分析
这里主要涉及到两个部分:
一个部分是查询部分(主要是StmtExecutor.execute),一个是后台清理分区部分(DynamicPartitionScheduler)
查询部分
每个SQL查询都会经过 StmtExecutor.execute
方法,进而生成物理执行计划,而在生成物理执行计划的阶段,会经过Optimizer
阶段,这个阶段由于默认情况下是基于CBO的优化,所以会统计涉及的表所扫描的数据量,最终会走到 StatisticsCalcUtils.deltaRows
方法:
private static long deltaRows(Table table, long totalRowCount) {
long tblRowCount = 0L;
for (Partition partition : table.getPartitions()) {
long partitionRowCount;
TableStatistic tableStatistic = GlobalStateMgr.getCurrentStatisticStorage()
.getTableStatistic(table.getId(), partition.getId());
if (tableStatistic.equals(TableStatistic.unknown())) {
partitionRowCount = partition.getRowCount();
} else {
partitionRowCount = tableStatistic.getRowCount();
}
tblRowCount += partitionRowCount;
}
if (tblRowCount < totalRowCount) {
return Math.max(1, (totalRowCount - tblRowCount) / table.getPartitions().size());
} else {
return 0;
}
}
这里会对 table.getPartitions
进行 迭代,也就是OlapTable的 idToPartition.valus
进行迭代, 注意 idToPartition
是HashMap
类型的,
总体的流程如下:
StmtExecutor.execute
||
\/
StatementPlanner.plan
||
\/
StatementPlanner.planQuery
||
\/
StatementPlanner.createQueryPlan
||
\/
Optimizer.optimize
||
\/
Optimizer.optimizeByCost
||
\/
Optimizer.memoOptimize
||
\/
SeriallyTaskScheduler.executeTasks
||
\/
DeriveStatsTask.execute
||
\/
StatisticsCalculator.estimatorStats
||
\/
StatisticsCalculator.computeOlapScanNode
||
\/
StatisticsCalcUtils.getTableRowCount
||
\/
StatisticsCalcUtils.deltaRows
后台清理部分
对于这种带有TTL的分区表来说,会有 DynamicPartitionScheduler
这个后台线程进行分区的删除。具体代码见:
protected void runAfterCatalogReady() {
if (!initialize) {
// check Dynamic Partition tables only when FE start
initDynamicPartitionTable();
}
setInterval(Config.dynamic_partition_check_interval_seconds * 1000L);
if (Config.dynamic_partition_enable) {
executeDynamicPartition();
}
executePartitionTimeToLive();
}
其中删除分区的频率就是由 Config.dynamic_partition_check_interval_seconds
也就是dynamic_partition_check_interval_seconds
来决定的,
其中executeDynamicPartition
方法就是执行分区删除,具体数据流如下 :
executeDynamicPartition
||
\/
executeDynamicPartitionForTable
||
\/
getDropPartitionClause
||
\/
GlobalStateMgr.getCurrentState().dropPartition(db, olapTable, dropPartitionClause);
||
\/
olapTable.dropPartition(db.getId(), partitionName, clause.isForceDrop());
||
\/
idToPartition.remove(partition.getId());
其中 在 executeDynamicPartitionForTable
中 RangePartitionInfo rangePartitionInfo = (RangePartitionInfo) olapTable.getPartitionInfo();
会根据PartitionInfo
的信息来进行判断,只有 RangePartitionInfo
类型支持partition TTL删除,也就是Expression partitioning (recommended) 和Dynamic partitioning支持.
在最后的idToPartition.remove(partition.getId())
中就会删除正在进行查询迭代的idToPartition.values
,就是导致并发问题