0
点赞
收藏
分享

微信扫一扫

2411rust,cargo清理缓存

互联网码农 2024-11-18 阅读 16

注意:Flink整合Hive后,可以用Hive的库和表,以及Hive中的函数方法,但是Hive不能使用Flink sql 里面的表,因为Hive不能进行流处理

这里Flink整合Hive,是将Flink的元数据保存到Hive中,并使用hive, 而其他的整合都只是使用

一、Flink整合Hive

1、上传jar包,开启hive元数据

# 1、上传jar包到flink的lib目录下
flink-sql-connector-hive-3.1.2_2.12-1.15.4.jar
mysql-connector-java-5.1.49.jar
cp /usr/local/soft/hadoop-3.1.1/share/hadoop/mapreduce/hadoop-mapreduce-client-core-3.1.1.jar /usr/local/soft/flink-1.15.4/lib/

#替换flink解析器 --如果不替换,后面有时候查询sql会报错
cp flink-table-planner_2.12-1.15.4.jar ../lib/
mv flink-table-planner-loader-1.15.4.jar ../opt/

# 2、重启集群(如果之前已经启动的话)
yarn application -list
yarn application -kill application_1731138432432_0001
yarn-session.sh -d

# 3、开启hive元数据服务
nohup hive --service metastore >> metastore.log 2>&1 &

# 4、重新进入sql命令行
sql-client.sh

2、创建hive catalog表

-- 创建catalog
CREATE CATALOG hive_catalog WITH (
'type' = 'hive',
'hive-conf-dir' = '/usr/local/soft/hive-3.1.3/conf'
);
-- 查看所有catalog
show catalogs;

-- 切换catalog
use catalog hive_catalog;

-- 创建数据库
create database flink;

-- 切换数据库
use flink;

-- 创建表
-- 在flink中使用flink建表语句床架弄得表,在hive中不能查询的
CREATE TABLE students_json (
id STRING,
name STRING,
age INT,
sex STRING,
clazz STRING
) WITH (
'connector' = 'kafka', -- 之前整合了kafka,自己可以换数据源
'topic' = 'students',
'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset', -- 起始消费的位置
'format' = 'json' -- 数据的格式,自动解析数据
);

select * from hive_catalog.flink.students_json;

3、使用hive function

-- 加载hive的函数
LOAD MODULE hive WITH ('hive-version' = '3.1.2');
select split('java,spark',',');

二、Flink整合Mysql

1、上传jar包

# 1、上传jar包到flink的lib目录
flink-connector-jdbc-1.15.4.jar
mysql-connector-java-5.1.49.jar

# 2、重启集群
yarn application -list
yarn application -kill application_1731138432432_0001
yarn-session.sh -d

# 3、重新进入sql命令行
sql-client.sh

2、Mysql Source

-- 有界流
CREATE TABLE students_jdbc (
id BIGINT,
name STRING,
age BIGINT,
gender STRING,
clazz STRING
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://master:3306/Test',
'table-name' = 'students',
'username' ='root',
'password' ='123456'
);
select * from students_jdbc;

3、Mysql Sink

-- sink 表
CREATE TABLE clazz_num_mysql (
clazz STRING,
num BIGINT,
PRIMARY KEY (clazz) NOT ENFORCED -- 按照主键进行更新
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://master:3306/Test',
'table-name' = 'clazz_num_mysql',
'username' ='root',
'password' ='123456'
);

-- mysql建表
CREATE TABLE clazz_num_mysql (
clazz varchar(255),
num BIGINT,
PRIMARY KEY (clazz) -- 按照主键进行更新
);

-- 将查询结果保存到mysql
insert into clazz_num_mysql
select clazz,count(1) as num
from
students_text
where clazz is not null
group by clazz;

三、Flink整合Hbase

1、上传jar包

# 1、上传jar包到flink的lib目录
flink-sql-connector-hbase-2.2-1.15.4.jar

# 2、重启集群
yarn application -list
yarn application -kill application_1731138432432_0001
yarn-session.sh -d

# 3、重新进入sql命令行
sql-client.sh

2、hbase sink

CREATE TABLE students_hbase (
id STRING,
info ROW<name STRING,age INT,sex STRING,clazz STRING>,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'hbase-2.2',
'table-name' = 'students_flink',
'zookeeper.quorum' = 'master:2181,node1:2181,node2:2181'
);
-- hbase创建表
create 'students_flink','info'

insert into students_hbase
select
id,
ROW(name,age,sex,clazz) as info
from
students_text
where clazz is not null;

-- hbase source
select id,info.age from students_hbase;

四、Flink整合Kafka

1、上传jar包

# 上传依赖包到flink的lib目录下
flink-sql-connector-kafka-1.15.4.jar
# 重启flink集群
yarn application -list
yarn application -kill application_1730969357243_0005
yarn-session.sh -d

2、Kafka Source

CREATE TABLE students_text (
id STRING,
name STRING,
age INT,
sex STRING,
clazz STRING
) WITH (
'connector' = 'kafka',
'topic' = 'students_text',
'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset', -- 起始消费的位置
'format' = 'csv', -- 数据的格式,需要按照顺序定义字段
'csv.ignore-parse-errors' ='true'
);

3、Kafka Sink

  • 将仅追加的流写入kafka

CREATE TABLE students_sink (
id STRING,
name STRING,
age INT,
sex STRING,
clazz STRING
) WITH (
'connector' = 'kafka',
'topic' = 'students_sink',
'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset', -- 起始消费的位置
'format' = 'json' -- 数据的格式,需要按照顺序定义字段
);

-- 将仅追加的流写入kafka
insert into students_sink
select * from
students_text
where name is not null;

-- 查看结果
kafka-console-consumer.sh --bootstrap-server master:9092,node1:9092,node2:9092 --from-beginning --topic students_sink
  • 将更新的流写入kafka

CREATE TABLE clazz_num (
clazz STRING,
num BIGINT
) WITH (
'connector' = 'kafka',
'topic' = 'clazz_num',
'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset', -- 起始消费的位置
'format' = 'canal-json'
);

-- 普通的json格式不支持保存更新的流
-- canal-json格式支持保存更新更改的数据

insert into clazz_num
select clazz,count(1) as num
from
students_text
where clazz is not null
group by clazz;

kafka-console-consumer.sh --bootstrap-server master:9092,node1:9092,node2:9092 --from-beginning --topic clazz_num

举报

相关推荐

0 条评论