投稿作者:王春波 内容来源:作者授权发布 出品平台:DataFunTalk
导读:本次分享的是在国内某头部体育用品企业的主品牌数据仓库建设中积累的宝贵经验,项目背景是真实的,作者作为技术经理全程参与数据仓库设计、开发、测试和优化工作,由于身份是乙方,所以这里就不暴露企业名称了。项目基于Hive on Spark搭建数据仓库,完成数据的抽取、转换,最后通过DataX将数据同步到ClickHouse用于BI报表、自助分析和移动Web查询。
该企业在启动主品牌数据仓库前,分布在其它品牌数据仓库上,在上面尝试了多个技术方案,包括基于Greenplum+Oacle模式、Hive+Kylin模式、Hive+Doirs模式的方案,但是以上三个项目最后查询的数据量都是千万级别到亿级,而主品牌则是十亿级的。
根据已有项目经验发现:
① 基于MPP架构的Greenplum虽然查询高效,但不方便进行存算分离,并发查询也不高;
② 基于Kylin模式的查询虽然性能快,但模型构建不太灵活,针对零售BI场景,预先构建Cube的难度非常大;
③ 基于 Doirs 模式的查询引擎虽然性能足够快,但是集群节点要求比较多,项目选型的时候还不支持 DataX 插件 ( 2021年9月底才发布 DorisWriter 插件 )。
所以,我们最后决定选用业界最主流的 Hive + ClickHouse 架构。
01系统架构
本次项目基于集团自研大数据平台进行开发,由系统架构部提供大数据平台支撑,我公司完成数据仓库模型构建和集市层数据加工,基于Hive SQL on Spark完成数据加工,基于自研工具完成数据抽取,基于DataX完成数据从Hive到ClickHouse的同步。前端展现分为移动端和PC端,移动端采用自研平台,进行模块化开发,PC端使用集团采购的商业化软件观远BI来展现。整体系统架构如下图:本次项目对接的下游系统包括新零售、欧宝、EBI、P60、P61、VOS和MDM等系统,主要抽取销售模型、库存明细(包括在途)、店仓和商品主数据等。数据抽取有增量和全量两种方式,采用大数据平台提供的模板化功能完成数据的同步。零售仓库采用标准分层,按照ODS、DWD、DWS、ADS划分,数据的加工全部由Hive脚本来完成,通过git进行代码版本管理,Hive SQL的执行由大数据平台自动提交到Spark on Yarn上。
本次项目的集市层采用ClickHouse(以下简称CK)作为查询引擎,数据在Hive中完成加工后通过DataX同步到ClickHouse。项目按照模块分为移动端和PC端,分别创建zy_mbi和zy_pcbi两个数据库,用于集市数据同步和查询。ClickHouse集群由 4 个节点(320G内存84核CPU)组成物理集群,并划分成 2 个逻辑集群,其中,Nginx或CHProxy用作数据读写时的负载均衡;ClickHouse 在数据副本的同步会依赖 Zookeeper 来存储元信息和协调。CK集群的结构如下图:
两个逻辑集群的设定如下:
- 单分片集群
单分片集群由4个节点组成对等服务,互为备份。当某张表存储在单分片集群,每个节点都存储该表的全量数据,各节点之间的数据会相互同步,往任何一个节点写入数据,都会同步到其它节点中。维表和事实表(10 亿以内),建议存储在单分片集群。
- 多分片集群
多分片集群,将4个节点切分成2个主节点2个备份节点,每个节点存储单表1/2的数据。当某张表存储在多分片集群时,相当于将该表做 sharding,每个分片存储该表的一部分数据,每个分片内的节点数据会自动同步。事实表(10 亿以上)的数据可以存储在多分片集群中。逻辑集群的划分根据配置文件 metrika.xml 进行配置:
在实际的项目应用和性能测试中,我们发现多分片集群在10亿以下数据量并且有关联查询的场景下比单分片集群慢很多(10倍以上的性能差异),因此本次项目的所有的表都部署在单分片集群上,并且由于单分片集群有4个节点,通过Nginx进行负载均衡,并发查询可以达到四倍单节点的效果,在一定程度上缓解了CK并发能力不强的问题。
02 建表规范
在实际项目中,我们在建表方面遵循以下要求:
①每张需要同步的表,我们都创建三张对应表,一张local表、一张local_tmp表、一张跨节点的视图;
②表名和字段名均采用小写;
③视图命名以_v结尾;
④字段类型尽快简单统一,项目主要采用Date、Timestamp、String、Int、Decimal(38,4) 五种数据类型,分别对应Hive的date、timestamp、string、int和decimal(38,4)类型;
⑤ local表和local_temp表采用ReplicatedMergeTree引擎,跨节点的视图采用Distributed引擎。
⑥所有的表分为单分区表和多分区表两种,多分区表采用按月分区或者按日分区,即toYYYYMM(order_dt)和toYYYYMMDD(inv_dt)两种,根据数据量大小来确定;
⑦ORDER BY的字段必须包含该表的业务日期,第二个字段优先选择CMS_CODE;
⑧index_granularity统一设置为8192。
基于以上共识,我们就有了标准建表模板,所有的新增表和修改表操作都参照该目标进行,可以极大的提高生产效率。
03数据同步
前面说到,我们的建表模板需要创建三张表,可能有人会好奇,为什么需要创建local_tmp表。看完这一部分,读者就明白了。由于数据同步可能会失败,并且有可能用户正在使用相关报表,这个时候删除数据再插入数据是有一个时间间隔的,为了避免用户那边出现查询不到数据或者查询的数据不完整的情况,我们就需要用到local_tmp表了。即先将Hive数据同步到local_tmp表然后在库内将数据迁移到local表,而页面查询则是基于local表之上的跨节点视图。库存数据迁移还分为两种情况,一种是全量替换,我们可以直接通过rename表来实现。前置和后置SQL如下:
平台自动生成的DataX同步配置如下:
第二种是部分数据替换,早期我们尝试过采用先delete后插入的方式,后面统一调整成为分区替换的方式。我们利用ClickHouse支持分区的特性,将local_tmp表和local表创建成为多分区的表,按月或者按日创建分区,然后通过ClickHouse的分区替换功能交换两张表的分区数据,这样就实现了数据的快速更新。分区替换模式需要在前置SQL里面清空目标表。
然后通过其它程序实现分区替换。
实现分区替换的逻辑其实非常简单,主要包括三个步骤:
第一步,查询表的分区信息。
SELECT partition,name,part_type,active,rows,bytes_on_disk,data_compressed_bytesfrom system.parts where table='dm_zy_pcbi_offline_stock_sale_local_tmp';
第二步,检查不同节点分区的数据是否一致。由于数据同步到ClickHouse以后,ClickHouse还需要进行节点间的数据复制,如果数据未复制完成就进行分区替换,结果会出现异常。这里就需要连上每一个实例节点,查询对应表的分区记录数。
select `partition`,sum(`rows`) from `system`.partswhere table ='dm_zy_pcbi_offline_stock_sale_local_tmp'--and `partition` ='202202'group by `partition`
第三步,从小到大执行分区替换。
ALTER TABLE dm_zy_pcbi_offline_stock_sale_local REPLACE PARTITION 202011 FROM dm_zy_pcbi_offline_stock_sale_local_tmp
总结一下,通过local_tmp表替换数据,有以下好处:
①清空local_tmp表对用户使用数据无影响,数据同步过程无感知;
②重命名表或者分区替换操作属于库内操作,时间非常短,用户无感知;
③数据同步失败的情况下,不影响报表使用上一个版本的数据;
④通过分区替换的方式,支持增量同步数据。
04实时数据
利用ClickHouse来完成实时数据接入,有两种方案分别对应ClickHouse提供的两种功能。
方案一:直接通过Flink写入ClickHouse。
从技术实现上也有两种方法,第一种是改写jdbc connector源码,增加ck方言,可以参考阿里云的文档:
https://help.aliyun.com/document_detail/175749.html。
第二种是直接引入flink-clickhouse-sink包,对应的github项目地址为:https://github.com/ivi-ru/flink-clickhouse-sink。
本次我们项目采用是FlinkSQL写入ClickHouse,采用的是第一种方法。通过FlinkSQL读取kafka数据,完成双流jion后直接写入ClickHouse。这里省略读取kafka的过程和双流join的代码,只保留写入ClickHouse的模板,简化代码如下:
由于Interval Join要数据库支持update和detele,ClickHouse虽然支持但是语法不正常,所以这个功能没能用起来,后面需要进一步优化。
方案二:直接通过ClickHouse读取kafka数据到内部表。
第一步,创建Kafka引擎表。
第二步,创建Kafka引擎表。
第三步,创建物化视图,持续不断地从 Kafka 收集数据并通过 SELECT 将数据转换为所需要的格式写入实体存储表。
数据接入以后,还需要对数据进行去重查询。为什么数据接入的时候不去重呢,因为ClickHouse唯一支持数据去重的引擎ReplacingMergeTree是以分区为单位删除重复数据的。只有在相同的数据分区内重复的数据才可以被删除,而不同数据分区之间的重复数据依然不能被剔除。
这个时候就用到了ClickHouse一个比较好用的语法 order by limit 1 by ,等于在一个语句里面实现了row _number去重(row _numbe去重需要嵌套两层查询)。
根据实际的业务场景,我们还需要在ClickHouse里面进行一些维度表关联,例如关联从hive同步过来的店铺和商品映射表,这些逻辑由于比较固定,并且维表数据量也比较大(在FlinkSQL中关联出现过内存溢出),所以我们选择创建ClickHouse视图来封装这些处理,举例如下:
目前系统按照方案一构建实时数据,且仅需当日的实时数据,页面查询结果的延时在1分钟以内。但是由于不能支持Interval Join,数据没办法保证100%准确性。如果能实现Interval Join,然后保证数据在24小时内可以多次修改,那么数据的准确性会进一步提高。
另外,根据实际应用情况,在接入实时数据时,Doris的优势比较明显。首先Doris对FlinkSQL的支持比较好,可以删除和修改数据;其次Doris的UNIQUE KEY可以自动完成数据去重;第三,Doirs对大表join支持更好,查询速度比ClickHouse更快。
05数据查询
至此,所有离线数据和实时数据都已经进入了ClickHouse,接下来就是BI报表和移动web查询了。
在PCBI方面,我们是通过数据集的方式构建查询结果集的。这里特别需要说到的一个场景是基于任意日期的本同期查询。
针对这个场景,我们做了以下优化:
①本同期数据只保留一份,即按照日期插入目标表的明细数据,通过union all查询+字段错位来获取本同期数据;
②在order_dt字段上增加分区和排序索引,并且order_dt作为order by字段的第一位;
③明细数据集字段尽可能精简,减少数据存储;
④维度表数据通过join来获取,页面会根据查询要求自动裁剪字段。
在移动BI方面主要是基于mybatis语法完成SQL语句的封装和参数的替换。这里的特殊场景就是基于任意日期的日、周、月、年、累计查询。
对此场景做的查询优化有:
①本同期数据只保留一份,即按照日期插入目标表的明细数据,通过union all查询+字段错位来获取本同期数据;
②通过mybatis的条件判断确定数据的过滤条件,增加代码复用率,并且保持都用biz_dt过滤;
③多个数据来源的数据拆分成不同的表,都保留相同的数据粒度,分别进行数据同步;
④所有底表的biz_dt字段上增加分区和排序索引,并且biz_dt作为order by字段的第一位;
⑤过滤条件也通过mybatis的判断语句添加,实现多种筛选和下钻复用同一个数据查询服务。
此外,还有实时数据和离线数据的联合查询,这个在PCBI和移动BI都有应用。
对此场景做的查询优化有:
①通过视图把离线数据和实时数据union all到一起;
②查询实时的时候通过参数条件直接跳过离线数据,查询离线数据的时候通过参数条件跳过实时数据;
③离线数据的开始日期、结束日期通过子查询来获取;
④在离线数据的order_dt字段上增加分区和排序索引,并且order_dt作为order by字段的第一位;
⑤在查询的外围嵌套维度表join,减少join的主表数据量。
在以上CK查询优化的基础上,目前80%的复杂查询可以在1s内返回,95%的查询在3s内完成,剩余少量需要count distinct的场景(无法提前预聚合)会稍慢一点。
经过半年的努力,项目完成各项开发目标,跑批和查询性能都达到了项目预期目标,满足上线条件并于2022年3月底正式上线。这篇“热气腾腾”的项目总结,既是对项目的一次复盘,也给想要使用Hive+ClickHouse构建数据仓库的朋友一些经验分享。
最后特别感谢Kenny.Wang和尘埃在项目过程中给予的指导和帮助,正是双方的密切配合和集思广益,才产出了这么多最佳实践经验,才有了项目的成功交付。