版本:Flink 1.13.1 ,Hive 2.1.1
进入flink解压目录
bin/yarn-session.sh -nm testFlink2Hive -d -qu root.test -jm 1024 -tm 1024
bin/sql-client.sh embedded
SET execution.checkpointing.interval = 12h;
CREATE CATALOG myhive WITH (
'type' = 'hive',
'default-database' = 'flink_test',
'hive-conf-dir' = '/etc/hive/conf/'
);
use CATALOG myhive;
CREATE TABLE if not exists flink_test.user_test
(
uid STRING ,
userip STRING ,
countryname STRING ,
countrycode STRING ,
regionname STRING ,
cityname STRING ,
visittime timestamp(0) ,
watermark FOR visittime AS visittime - INTERVAL '5' second
)WITH(
'connector' = 'kafka',
'topic' = 'user_test_topic',
'properties.group.id' = 'user_test_consumer1',
'scan.startup.mode' = 'earliest-offset',
'properties.bootstrap.servers' = 'xxx:9092,xxx:9092,xxx:9092',
'format' = 'json'
);
set table.sql-dialect=hive;
CREATE external TABLE if not exists flink_test.test_flink2hive (
uid string
, user_ip string
, country_name string
, country_code string
, region_name string
, city_name string
, visit_time string
) partitioned BY (dt string)
stored AS parquet
tblproperties (
'parquet.compression'='SNAPPY',
'sink.partition-commit.policy.kind' = 'metastore,success-file',
'sink.partition-commit.success-file.name' = '_SUCCESS',
'sink.partition-commit.trigger'='partition-time',
'sink.partition-commit.delay'='1h',
'partition.time-extractor.timestamp-pattern'='$dt',
'sink.rolling-policy.rollover-interval'='1h',
'auto-compaction'='true'
);
set table.sql-dialect=default;
set table.dynamic-table-options.enabled=true;
INSERT INTO flink_test.test_flink2hive
SELECT uid AS uid,
userIp AS user_ip,
countryName AS country_name,
countryCode AS country_code,
regionName AS region_name,
cityName AS city_name,
Cast(visitTime AS STRING) AS visit_time,
substr(Cast(visitTime AS STRING), 1, 10) AS dt
FROM flink_test.user_test;
The End.