1.文档编写目的
本篇文章主要介绍如何使用StreamSets实时采集Kafka的数据并将采集的数据写入Hive,StreamSets的流程处理如下:
![[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-n8LehaLw-1645415593760)(https://ask.qcloudimg.com/http-save/yehe-1522219/cr56v67c6f.jpeg?imageView2/2/w/1620)]](https://file.cfanz.cn/uploads/png/2022/02/21/4/2A1ad3aUHM.png)
- 内容概述
1.测试环境准备
2.配置StreamSets
3.创建Pipline及测试
4.总结
- 测试环境
1.RedHat7.3
2.CM和CDH版本为cdh5.13.3
3.Kafka2.2.0(0.10.2)
4.StreamSets3.3.0
- 前置条件
1.集群已启用Kerberos
2.集群已安装Kafka并启用Kerberos
2.测试环境准备
1.准备一个访问Kerberos环境的Kafka的fayson.keytab文件
[root@cdh01 ~]# kadmin.local
kadmin.local: listprincs fayson*
fayson@FAYSON.COM
kadmin.local: xst -norandkey -k fayson.keytab fayson@FAYSON.COM
(可左右滑动)
![[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-365dJkK7-1645415593762)(https://ask.qcloudimg.com/http-save/yehe-1522219/aboosg6vpu.jpeg?imageView2/2/w/1620)]](https://file.cfanz.cn/uploads/png/2022/02/21/4/00GT2889L5.png)
fayson.keytab主要在向Kafka生产消息和StreamSets消费Kafka数据时使用。
2.准备向Kerberos环境的Kafka集群生产数据脚本
![[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-wRpOiAss-1645415593764)(https://ask.qcloudimg.com/http-save/yehe-1522219/f2my8srjar.png?imageView2/2/w/1620)]](https://file.cfanz.cn/uploads/png/2022/02/21/4/54CRdNDCJ1.png)
该脚本用于向Kafka发送JSON数据,脚本说明:
run.sh:向Kafka指定topic生产数据的脚本
ods_user_600.txt:发送到Kafka的测试数据,共600条测试数据,数据的id是唯一的。
![[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-jhFno3dR-1645415593766)(https://ask.qcloudimg.com/http-save/yehe-1522219/q903saz4jq.jpeg?imageView2/2/w/1620)]](https://file.cfanz.cn/uploads/png/2022/02/21/4/GeFfc636Ra.png)
注意:发送数据的示例代码是将ods_user_600.txt的每条数据转换为json格式了,示例数据如下:
{
"occupation": "生产工作、运输工作和部分体力劳动者",
"address": "台东东二路16号-8-8",
"city": "长治",
"marriage": "1",
"sex": "1",
"name": "仲淑兰",
"mobile_phone_num": "13607268580",
"bank_name": "广州银行31",
"id": "510105197906185179",
"child_num": "1",
"fix_phone_num": "15004170180"
}
(可左右滑动)
lib:Fayson打包的示例jar包及Kafka的依赖包
![[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-WTIPhzZi-1645415593768)(https://ask.qcloudimg.com/http-save/yehe-1522219/rnjig3v8j0.jpeg?imageView2/2/w/1620)]](https://file.cfanz.cn/uploads/png/2022/02/21/4/b2Q6F47d7D.png)
conf:示例代码运行的配置文件
![[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ViZKPzxV-1645415593769)(https://ask.qcloudimg.com/http-save/yehe-1522219/w7c893agwn.jpeg?imageView2/2/w/1620)]](https://file.cfanz.cn/uploads/png/2022/02/21/4/bdHd461MIO.png)
该脚本运行主要依赖0286.properties、jaas.conf、krb5.conf和fayson.keytab文件。
0286.properties配置文件,将如下配置修改为自己集群的环境:
[root@cdh04 conf]# vim 0286.properties
bootstrap.servers=cdh02.fayson.com:9092,cdh03.fayson.com:9092,cdh04.fayson.com:9092
topic.name=kafka_hive_topic
krb5.debug=false
group.id=testgroup
(可左右滑动)
![[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-G2FMkAPD-1645415593771)(https://ask.qcloudimg.com/http-save/yehe-1522219/w7exn0sz5n.png?imageView2/2/w/1620)]](https://file.cfanz.cn/uploads/png/2022/02/21/4/RZ47Cd48Q2.png)
jaas.conf文件内容如下:
[root@cdh04 conf]# vim jaas.conf
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="/data/disk1/0286-kafka-shell/conf/fayson.keytab"
principal="fayson@FAYSON.COM";
};
Client {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
keyTab="/data/disk1/0286-kafka-shell/conf/fayson.keytab"
principal="fayson@FAYSON.COM";
};
(可左右滑动)
![[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-MGdIWKjF-1645415593772)(https://ask.qcloudimg.com/http-save/yehe-1522219/s7z3iyb17g.jpeg?imageView2/2/w/1620)]](https://file.cfanz.cn/uploads/png/2022/02/21/4/eIe1Q0eIff.png)
krb5.conf:拷贝集群的krb5.conf文件到conf目录下即可。
![[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-e4xh6Exw-1645415593774)(https://ask.qcloudimg.com/http-save/yehe-1522219/3q7jkr3ybq.jpeg?imageView2/2/w/1620)]](https://file.cfanz.cn/uploads/png/2022/02/21/4/WLX955641e.png)
3.通过如下命令创建测试topic
[root@cdh04 conf]# kafka-topics --create --zookeeper cdh01.fayson.com:2181 --replication-factor 3 --partitions 1 --topic kafka_hive_topic
(可左右滑动)
![[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-fgS0uIdi-1645415593776)(https://ask.qcloudimg.com/http-save/yehe-1522219/963iqcuu8i.jpeg?imageView2/2/w/1620)]](https://file.cfanz.cn/uploads/png/2022/02/21/4/XDW1e8GJS6.png)
4.通过Hue为sdc用户授权
![[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-HCxr3a42-1645415593779)(https://ask.qcloudimg.com/http-save/yehe-1522219/3ilbygx77y.jpeg?imageView2/2/w/1620)]](https://file.cfanz.cn/uploads/png/2022/02/21/4/34d15356RP.png)
授予default库的所有权限以及/user/hive/warehouse目录的URI权限,否则sdc用户无法创建表。
3.StreamSets配置
由于Kafka集群启用了Kerberos,所以这里在使用StreamSets消费Kafka数据之前,需要配置StreamSets访问Kafka的Kerberos用户信息,具体配置如下:
1.登录Cloudera Manager并进入StreamSets服务
![[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-PEHRbVdc-1645415593780)(https://ask.qcloudimg.com/http-save/yehe-1522219/jo965mcudj.jpeg?imageView2/2/w/1620)]](https://file.cfanz.cn/uploads/png/2022/02/21/4/33d6VfbA5Y.png)
2.点击“配置”,搜索“sdc_java_opts”,在该配置项中增加如下内容
-Djava.security.auth.login.config=/data/disk1/0286-kafka-shell/conf/jaas.conf
(可左右滑动)
![[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-vYtJUCah-1645415593781)(https://ask.qcloudimg.com/http-save/yehe-1522219/dpv7f16br0.jpeg?imageView2/2/w/1620)]](https://file.cfanz.cn/uploads/png/2022/02/21/4/6D540R6cdZ.png)
注意:jaas.conf文件需要存在于StreamSets的Data Collector服务所在节点的指定目录下。
4.创建StreamSets的Pipline
1.登录StreamSets,创建一个kafka2kudu的Pipline
![[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-SGv28TRN-1645415593783)(https://ask.qcloudimg.com/http-save/yehe-1522219/75ouqxbng3.jpeg?imageView2/2/w/1620)]](https://file.cfanz.cn/uploads/png/2022/02/21/4/W52F45OE50.png)
2.在Pipline流程中添加Kafka Consumer作为源并配置Kafka基础信息
![[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-GRiLhWBM-1645415593785)(https://ask.qcloudimg.com/http-save/yehe-1522219/fiyzbtb42p.jpeg?imageView2/2/w/1620)]](https://file.cfanz.cn/uploads/png/2022/02/21/4/4FX6M69ba4.png)
注意:Kafka的版本选择,这里Fayson选择的Apache的Kafka,在Kerberos环境下选择CDH对应的Kafka版本会报“Couldnot get partition count for topic ‘kafka_hive_topic’”
配置Kafka相关信息,如Broker、ZK、Group、Topic及Kerberos信息

注意:访问Kerberos环境的Kafka需要配置security.protocol=SASL_PLAINTEXT和sasl.kerberos.service.name=kafka两个参数。
配置数据格式化方式,写入Kafka的数据为JSON格式,所以这里选择JSON
![[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-U1H5WxNR-1645415593786)(https://ask.qcloudimg.com/http-save/yehe-1522219/c3x1nzc0x3.jpeg?imageView2/2/w/1620)]](https://file.cfanz.cn/uploads/png/2022/02/21/4/8ea17EB8Ta.png)
3.添加Hive Metadata中间处理模块,选择对应的CDH版本
![[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-GmldoOLi-1645415593788)(https://ask.qcloudimg.com/http-save/yehe-1522219/3rmsuhepbe.jpeg?imageView2/2/w/1620)]](https://file.cfanz.cn/uploads/png/2022/02/21/4/088H4C4e64.png)
配置Hive的JDBC信息
![[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-TKJwBSHu-1645415593789)(https://ask.qcloudimg.com/http-save/yehe-1522219/2otvqyrgo6.jpeg?imageView2/2/w/1620)]](https://file.cfanz.cn/uploads/png/2022/02/21/4/OX61VcRaQ7.png)
注意:这里访问Hive的JDBC连接,需要添加Kerberos信息,由于无法通过StreamSets界面指定我们自定义用户访问Hive,所以这里默认使用的是StreamSets的sdc用户,如果集群启用了Sentry则需要注意为sdc用户授权,否则无法创建hive表和写数据。
配置Hive的表信息,指定表名和库名
![[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ZOv8k45c-1645415593790)(https://ask.qcloudimg.com/http-save/yehe-1522219/xhywdbw1m3.jpeg?imageView2/2/w/1620)]](https://file.cfanz.cn/uploads/png/2022/02/21/4/90DQ6f5H56.png)
指定数据格式,指定为Avro,选项中有parquet格式,但在后续处理中并不支持parquet格式
![[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-sdz1rbFL-1645415593791)(https://ask.qcloudimg.com/http-save/yehe-1522219/rgl0jft0yk.jpeg?imageView2/2/w/1620)]](https://file.cfanz.cn/uploads/png/2022/02/21/4/11fd14bD94.png)
4.添加Hadoop FS处理模块,主要用于将HiveMetadata的数据写入HDFS
![[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-j6EqOlFT-1645415593793)(https://ask.qcloudimg.com/http-save/yehe-1522219/v0u4w00ulj.jpeg?imageView2/2/w/1620)]](https://file.cfanz.cn/uploads/png/2022/02/21/4/G90N9c8O6O.png)
配置Hadoop FS,配置HDFS URL和是否启用Kerberos认证
![[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-RTiMG8WL-1645415593794)(https://ask.qcloudimg.com/http-save/yehe-1522219/03li2rut9p.jpeg?imageView2/2/w/1620)]](https://file.cfanz.cn/uploads/png/2022/02/21/4/2M95c76C6A.png)
配置Hadoop FS的Out Files
![[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-oI1ZLUFU-1645415593795)(https://ask.qcloudimg.com/http-save/yehe-1522219/z8v96ef1y5.jpeg?imageView2/2/w/1620)]](https://file.cfanz.cn/uploads/png/2022/02/21/4/13403Be1PS.png)
注意:勾选“Directory in Header”使HDFS写入数据时使用上一步中Hive Metadata模块传递的目录,“IdleTimeout”主要是用于指定Hadoop FS模块空闲多久则将数据刷到HDFS数据目录。
配置Late Records参数,使用默认参数即可
![[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-qqrQDLUv-1645415593797)(https://ask.qcloudimg.com/http-save/yehe-1522219/rqdmscdmhm.jpeg?imageView2/2/w/1620)]](https://file.cfanz.cn/uploads/png/2022/02/21/4/35OH4M66ca.png)
指定写入到HDFS的数据格式
![[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ds3VpJus-1645415593799)(https://ask.qcloudimg.com/http-save/yehe-1522219/5czmvjud31.jpeg?imageView2/2/w/1620)]](https://file.cfanz.cn/uploads/png/2022/02/21/4/A064378520.png)
5.添加Hive Metastore模块,该模块主要用于向Hive库中创建表
![[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-XHA6Mo3b-1645415593800)(https://ask.qcloudimg.com/http-save/yehe-1522219/v4efh8cqwv.jpeg?imageView2/2/w/1620)]](https://file.cfanz.cn/uploads/png/2022/02/21/4/0118efaXRD.png)
配置Hive信息,JDBC访问URL
![[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-N0eFk6gI-1645415593804)(https://ask.qcloudimg.com/http-save/yehe-1522219/5dqo5iww7m.jpeg?imageView2/2/w/1620)]](https://file.cfanz.cn/uploads/png/2022/02/21/4/0765R6UYS5.png)
Hive Metastore的高级配置
![[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-4zSYTVYc-1645415593805)(https://ask.qcloudimg.com/http-save/yehe-1522219/zys0tbdgm9.jpeg?imageView2/2/w/1620)]](https://file.cfanz.cn/uploads/png/2022/02/21/4/B2ZAV20D24.png)
6.点击校验流程,如下图所示则说明流程正常
![[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-SAJaRAC3-1645415593806)(https://ask.qcloudimg.com/http-save/yehe-1522219/p8smtilc97.jpeg?imageView2/2/w/1620)]](https://file.cfanz.cn/uploads/png/2022/02/21/4/4BS2312933.png)
到此为止完成了Kafka数据到Hive的流程配置。
5.流程测试验证
1.启动kafka2hive的Pipline,启动成功如下图显示
![[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-bJxkO2XM-1645415593807)(https://ask.qcloudimg.com/http-save/yehe-1522219/neuelfjsj9.jpeg?imageView2/2/w/1620)]](https://file.cfanz.cn/uploads/png/2022/02/21/4/1Na9WPA982.png)
2.在命令行运行run.sh脚本向Kafka发送消息
[root@cdh04 ~]# cd /data/disk1/0286-kafka-shell/
[root@cdh04 0286-kafka-shell]# sh run.sh ods_user_600.txt
(可左右滑动)
![[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-4N6S8mez-1645415593810)(https://ask.qcloudimg.com/http-save/yehe-1522219/l8n1n114im.jpeg?imageView2/2/w/1620)]](https://file.cfanz.cn/uploads/png/2022/02/21/4/536K5UH58f.png)
3.在StreamSets中查看kafka2hive的pipline运行情况
![[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Um6G6pUY-1645415593810)(https://ask.qcloudimg.com/http-save/yehe-1522219/xowov2iyud.jpeg?imageView2/2/w/1620)]](https://file.cfanz.cn/uploads/png/2022/02/21/4/Pbff0ac2WL.png)
4.使用sdc用户登录Hue查看ods_user表数据
![[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Fzn2NaPi-1645415593812)(https://ask.qcloudimg.com/http-save/yehe-1522219/3ju8eafn8v.jpeg?imageView2/2/w/1620)]](https://file.cfanz.cn/uploads/png/2022/02/21/4/5M83dd61f9.png)
入库的数据总条数
![[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-xV1wUZWW-1645415593813)(https://ask.qcloudimg.com/http-save/yehe-1522219/a860obzx4j.jpeg?imageView2/2/w/1620)]](https://file.cfanz.cn/uploads/png/2022/02/21/4/35424ec6cZ.png)
可以看到ods_user表的总条数与准备的测试数据量一致。
6.总结
1.Kafka集群启用了Kerberos后,StreamSets的Kafka模块在消费数据时需要在sdc_java_opt中加载jaas.conf,指定消费Kafka数据的Kerberos账号。
2.Hive Metadata模块主要是用于将Kafka的JSON数据进行封装分流处理,data数据交给HDFS模块,MetaData数据交个HiveMetastore模块,HDFS模块主要用于写数据到hive表的数据目录,HiveMetastore主要用于判断表是否存在是否需要创建表。
3.由于HiveMetastore模块无法指定自定义的Kerberos账号,默认使用sdc用户访问Hive,在启用Sentry的集群则需要为sdc用户授权,否则无权限创建表。
4.HDFS模块在接收到HiveMetadata模块的数据后生成的为临时文件,不是立即将数据写入到HDFS,可以通过“Idle Timeout”参数来控制刷新数据到HDFS的频率。










