0
点赞
收藏
分享

微信扫一扫

Flink集成Hudi及示例

生活记录馆 2022-03-31 阅读 34

文章目录

环境准备

在编译Hudi之前,要保证自己的机器上搭建了Hadoop集群、Hive。

安装Maven

Maven的版本选择的是3.5.4,可以根据自己的需求选择
官网地址: https://maven.apache.org/download.cgi

  1. 将apache-maven-3.5.4-bin.tar.gz上传到linux的/opt/software目录下
  2. 解压apache-maven-3.5.4-bin.tar.gz到/opt/module/目录下面
tar -zxvf apache-maven-3.5.4-bin.tar.gz -C /opt/module/
  1. 修改apache-maven-3.5.4的名称为maven
cd /opt/module/
mv apache-maven-3.5.4/ maven
  1. 添加环境变量到/etc/profile中
vim /etc/profile
#MAVEN_HOME
export MAVEN_HOME=/opt/module/maven
export PATH=$PATH:$MAVEN_HOME/bin
  1. 使环境变量生效
source /etc/profile
  1. 测试是否安装成功
mvn -v

在这里插入图片描述
7. 修改setting.xml,指定为阿里云,下载依赖可以快一些

 vim maven/conf/settings.xml
<!-- 添加阿里云镜像-->
<mirror>
        <id>nexus-aliyun</id>
        <mirrorOf>central</mirrorOf>
        <name>Nexus aliyun</name>
        <url>http://maven.aliyun.com/nexus/content/groups/public</url>
</mirror>

编译Hudi

  1. 下载地址:https://www.apache.org/dyn/closer.lua/hudi/0.10.1/hudi-0.10.1.src.tgz
  2. 下载后将其放到/opt/software目录下(也可以采用git的方式下载)
  3. 将其解压到/opt/module下
tar -zxvf hudi-0.10.1.src.tgz -C /opt/module
  1. 修改pom.xmk文件
cd hudi-0.10.1
vim pom.xml
<repository>
        <id>nexus-aliyun</id>
        <name>nexus-aliyun</name>
        <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
        <releases>
            <enabled>true</enabled>
        </releases>
        <snapshots>
            <enabled>false</enabled>
        </snapshots>
    </repository>
  1. 构建编译
    在hudi-1.10.1目录下,执行如下命令
mvn clean package -DskipTests -Dspark3 -Dscala-2.12

大约会耗费十几分钟,当出现如下日志,说明编译成功:
在这里插入图片描述
打开hudi下的packing目录,发现地下生成了许多jar包
在这里插入图片描述

安装Flink

由于我安装的是最新版的hudi-0.10.1,所以需要安装官方建议的Flink1.13.x版本的Flink。
附上Hudi文档的地址: https://hudi.apache.org/docs/flink-quick-start-guide
在这里插入图片描述

  1. 将flink-1.13.6-bin-scala_2.11.tgz上传到linux的/opt/software目录下
  2. 解压到/opt/module下
tar -zxvf flink-1.13.6-bin-scala_2.11.tgz -C /opt/module/
  1. 添加hadoop环境变量
export HADOOP_COMMON_HOME=/opt/module/hadoop-3.1.3
export HADOOP_HDFS_HOME=/opt/module/hadoop-3.1.3
export HADOOP_YARN_HOME=/opt/module/hadoop-3.1.3
export HADOOP_MAPRED_HOME=/opt/module/hadoop-3.1.3
export HADOOP_CLASSPATH=`hadoop classpath`

注意与自己的Hadoop安装位置匹配
4. 复制jar包

cd /opt/module/Hudi/packaging/hudi-flink-bundle/target/
cp hudi-flink-bundle_2.12-0.10.1.jar /opt/module/flink-1.13.6/lib
  1. 启动Flink集群
export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`
cd /opt/module/flink-1.13.6/bin/
./start-cluster.sh

在这里插入图片描述
6. 启动Flink客户端

export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`
./bin/sql-client.sh embedded shell

在这里插入图片描述
在这里插入图片描述
7. 建表

#设置返回结果模式为tableau,让结果直接显示
set sql-client.execution.result-mode=tableau;

注意这个版本不同,命令不一样

CREATE TABLE table1(
  uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED,
  name VARCHAR(10),
  age INT,
  ts TIMESTAMP(3),
  `partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH (
  'connector' = 'hudi',
  'path' = 'hdfs://hadoop102:8020/t1',
  'table.type' = 'MERGE_ON_READ' -- this creates a MERGE_ON_READ table, by default is COPY_ON_WRITE
);

在这里插入图片描述

  1. 插入数据
INSERT INTO table1 VALUES
  ('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),
  ('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),
  ('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),
  ('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),
  ('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),
  ('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),
  ('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),
  ('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4');

在这里插入图片描述
访问Flink的端口8081,可以看到任务已经完成
hadoop102:8081
在这里插入图片描述
再访问HDFS的端口号,可以看到数据已经存入HDFS
在这里插入图片描述
9. 查询

select * from table0;

在这里插入图片描述
10. 更新数据

insert into table0 values ('id1','Danny',55,TIMESTAMP '1970-01-01 00:00:01','par1');

查询后可以观察到id为1的用户年龄已修改
至此,已经可以通过Filnk操作Hudi将数据写入到HDFS,并且可以查询,且支持修改

举报

相关推荐

0 条评论