0
点赞
收藏
分享

微信扫一扫

一个flink+kafka+hive示例

烟中雯城 2022-04-14 阅读 88
flinkhive

版本:Flink 1.13.1 ,Hive 2.1.1

进入flink解压目录

bin/yarn-session.sh -nm testFlink2Hive -d -qu root.test -jm 1024 -tm 1024
bin/sql-client.sh embedded 

SET execution.checkpointing.interval = 12h;

CREATE CATALOG myhive WITH (
    'type' = 'hive',
    'default-database' = 'flink_test',
    'hive-conf-dir' = '/etc/hive/conf/'
);

use CATALOG myhive;

CREATE TABLE if not exists flink_test.user_test
(
 uid STRING ,
 userip STRING ,
 countryname STRING ,
 countrycode STRING ,
 regionname STRING ,
 cityname STRING ,
 visittime timestamp(0) ,
 watermark FOR visittime AS visittime - INTERVAL '5' second
)WITH(
'connector' = 'kafka',
'topic' = 'user_test_topic',
'properties.group.id' = 'user_test_consumer1',
'scan.startup.mode' = 'earliest-offset',
'properties.bootstrap.servers' = 'xxx:9092,xxx:9092,xxx:9092',
'format' = 'json'
);

set table.sql-dialect=hive;

CREATE external TABLE if not exists flink_test.test_flink2hive (
  uid string
  , user_ip string
  , country_name string
  , country_code string
  , region_name string
  , city_name string
  , visit_time string
  ) partitioned BY (dt string) 
  stored AS parquet
  tblproperties ( 
  'parquet.compression'='SNAPPY', 
  'sink.partition-commit.policy.kind' = 'metastore,success-file', 
  'sink.partition-commit.success-file.name' = '_SUCCESS', 
  'sink.partition-commit.trigger'='partition-time', 
  'sink.partition-commit.delay'='1h', 
  'partition.time-extractor.timestamp-pattern'='$dt', 
  'sink.rolling-policy.rollover-interval'='1h', 
  'auto-compaction'='true'
  );

set table.sql-dialect=default;

set table.dynamic-table-options.enabled=true;

INSERT INTO flink_test.test_flink2hive
SELECT uid                                      AS uid,
       userIp                                   AS user_ip,
       countryName                              AS country_name,
       countryCode                              AS country_code,
       regionName                               AS region_name,
       cityName                                 AS city_name,
       Cast(visitTime AS STRING)                AS visit_time,
       substr(Cast(visitTime AS STRING), 1, 10) AS dt
FROM   flink_test.user_test;

The End.

举报

相关推荐

0 条评论