0
点赞
收藏
分享

微信扫一扫

flume使用hive stream写入到hive

1、hive中创建表:

create table customers (id string, name string, email string, street_address string, company string)
partitioned by (time string)
clustered by (id) into 5 buckets stored as orc
location '/user/iteblog/salescust'
TBLPROPERTIES ('transactional'='true');

注意:采用orc存储,同时支持clustered

为了在Hive中启用事务,我们需要在Hive中进行如下的配置:

hive.txn.manager = org.apache.hadoop.hive.ql.lockmgr.DbTxnManager


2、配置flume:

$ vi flumetohive.conf
flumeagent1.sources = source_from_kafka
flumeagent1.channels = mem_channel
flumeagent1.sinks = hive_sink
# Define / Configure source
flumeagent1.sources.source_from_kafka.type = org.apache.flume.source.kafka.KafkaSource
flumeagent1.sources.source_from_kafka.zookeeperConnect = sandbox.hortonworks.com:2181
flumeagent1.sources.source_from_kafka.topic = SalesDBTransactions
flumeagent1.sources.source_from_kafka.groupID = flume
flumeagent1.sources.source_from_kafka.channels = mem_channel
flumeagent1.sources.source_from_kafka.interceptors = i1
flumeagent1.sources.source_from_kafka.interceptors.i1.type = timestamp
flumeagent1.sources.source_from_kafka.consumer.timeout.ms = 1000

# Hive Sink
flumeagent1.sinks.hive_sink.type = hive
flumeagent1.sinks.hive_sink.hive.metastore = thrift://sandbox.hortonworks.com:9083
flumeagent1.sinks.hive_sink.hive.database = raj
flumeagent1.sinks.hive_sink.hive.table = customers
flumeagent1.sinks.hive_sink.hive.txnsPerBatchAsk = 2
flumeagent1.sinks.hive_sink.hive.partition = %y-%m-%d-%H-%M
flumeagent1.sinks.hive_sink.batchSize = 10
flumeagent1.sinks.hive_sink.serializer = DELIMITED
flumeagent1.sinks.hive_sink.serializer.delimiter = ,
flumeagent1.sinks.hive_sink.serializer.fieldnames = id,name,email,street_address,company
# Use a channel which buffers events in memory
flumeagent1.channels.mem_channel.type = memory
flumeagent1.channels.mem_channel.capacity = 10000
flumeagent1.channels.mem_channel.transactionCapacity = 100
# Bind the source and sink to the channel
flumeagent1.sources.source_from_kafka.channels = mem_channel
flumeagent1.sinks.hive_sink.channel = mem_channel

1)source使用的是kafka;

2)sink使用的是hive,这里没有使用hdfs+hive创建外表的方式,主要是因为flume的hive sink内部使用了hive stream来做的orc文件追加,好处是文件小而且效率高。


参考:https://www.iteblog.com/archives/1771.html


举报

相关推荐

0 条评论