1 环境准备
1)在eclipse中创建一个java工程
2)在工程的根目录创建一个lib文件夹
3)解压kafka安装包,将安装包libs目录下的jar包拷贝到工程的lib目录下,并build path。
4)启动zk和kafka集群,在kafka集群中打开一个消费者
[atguigu@hadoop102 kafka]$ bin/kafka-console-consumer.sh --zookeeper hadoop102:2181 --topic first
2 Kafka生产者Java API
2.1 创建生产者(过时的API)
package com.atguigu.kafka; |
2.2 创建生产者(新API)
package com.atguigu.kafka; |
2.3 创建生产者带回调函数(新API)
package com.atguigu.kafka; |
2.4 自定义分区生产者
0)需求:将所有数据存储到topic的第0号分区上
1)定义一个类实现Partitioner接口,重写里面的方法(过时API)
package com.atguigu.kafka; |
2)自定义分区(新API)
package com.atguigu.kafka; |
3)在代码中调用
package com.atguigu.kafka; |
4)测试
(1)在hadoop102上监控/opt/module/kafka/logs/目录下first主题3个分区的log日志动态变化情况
[atguigu@hadoop102 first-0]$ tail -f 00000000000000000000.log
[atguigu@hadoop102 first-1]$ tail -f 00000000000000000000.log
[atguigu@hadoop102 first-2]$ tail -f 00000000000000000000.log
(2)发现数据都存储到指定的分区了。
3 Kafka消费者Java API
0)在控制台创建发送者
[atguigu@hadoop104 kafka]$ bin/kafka-console-producer.sh --broker-list hadoop102:9092 --topic first
>hello world
1)创建消费者(过时API)
package com.atguigu.kafka.consume; |
2)官方提供案例(自动维护消费情况)(新API)
package com.atguigu.kafka.consume; |