0
点赞
收藏
分享

微信扫一扫

Flink常用API之Kafka的Source

基于 Kafka 的 Source
首 先 需 要 配 置 Kafka 连 接 器 的 依 赖 , 另 外 更 多 的 连 接 器 可 以 查 看 官 网 : https://ci.apache.org/projects/flink/flink-docs-release-1.9/zh/dev/connectors/

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.9.1</version>
</dependency>

第一种:读取 Kafka 中的普通数据(String)

package source

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.kafka.common.serialization.StringDeserializer

import java.util.Properties

/**
* @Author yqq
* @Date 2021/12/25 15:35
* @Version 1.0
*/
object KafkaSource {
def main(args: Array[String]): Unit = {
val environment = StreamExecutionEnvironment.getExecutionEnvironment
environment.setParallelism(1)
import org.apache.flink.streaming.api.scala._
//连接kafka,且kafaka中数据为字符串
val properties = new Properties()
properties.setProperty("bootstrap.servers","node1:9092,node2:9092,node3:9092")
properties.setProperty("group.id","flink01")
properties.setProperty("key.deserializer",classOf[StringDeserializer].getName)
properties.setProperty("value.deserializer",classOf[StringDeserializer].getName)
properties.setProperty("auto.offset.reset","latest")

environment.addSource(new FlinkKafkaConsumer[String]("topic_01",new SimpleStringSchema(),properties))
.print()
environment.execute()
}
}

Flink常用API之Kafka的Source_scala
Flink常用API之Kafka的Source_apache_02
第二种:读取 Kafka 中的KeyValue数据
定义消费者

package source

import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTuple2TypeInformation, createTypeInformation}
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, KafkaDeserializationSchema}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer

import java.util.Properties

/**
* @Author yqq
* @Date 2021/12/25 16:37
* @Version 1.0
*/
import org.apache.flink.streaming.api.scala._
object KafkaSourceKeyValue {
//2、导入隐式转换
def main(args: Array[String]): Unit = {
//1、初始化Flink流计算的环境
val streamEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//修改并行度
streamEnv.setParallelism(1) //默认所有算子的并行度为1


//连接Kafka的属性
val props = new Properties()
props.setProperty("bootstrap.servers","hadoop101:9092,hadoop102:9092,hadoop103:9092")
props.setProperty("group.id","flink01")
props.setProperty("key.deserializer",classOf[StringDeserializer].getName)
props.setProperty("value.deserializer",classOf[StringDeserializer].getName)
props.setProperty("auto.offset.reset","latest")

//设置Kafka数据源
val stream: DataStream[(String, String)] = streamEnv.addSource(new FlinkKafkaConsumer[(String,String)]("t_bjsxt",new MyKafkaReader,props))

stream.print()

streamEnv.execute()
}

//自定义一个类,从Kafka中读取键值对的数据
class MyKafkaReader extends KafkaDeserializationSchema[(String,String)]{
//是否流结束
override def isEndOfStream(nextElement: (String, String)): Boolean = {
false
}
//反序列化
override def deserialize(record: ConsumerRecord[Array[Byte], Array[Byte]]): (String, String) = {
if(record!=null){
var key="null"
var value="null"
if(record.key()!=null){
key =new String(record.key(),"UTF-8")
}
if(record.value()!=null){ //从Kafka记录中得到Value
value =new String(record.value(),"UTF-8")
}
(key,value)
}else{//数据为空
("null","nulll")
}
}

//指定类型
override def getProducedType: TypeInformation[(String, String)] ={
createTuple2TypeInformation(createTypeInformation[String],createTypeInformation[String])
}
}
}

定义生产者

package source

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer}

import java.util.{Properties, Random}

/**
* @Author yqq
* @Date 2021/12/25 17:12
* @Version 1.0
*/
object MykafkaProducer {
def main(args: Array[String]): Unit = {
//连接Kafka的属性
val props = new Properties()
props.setProperty("bootstrap.servers","node1:9092,node2:9092,node3:9092")
props.setProperty("key.serializer",classOf[StringSerializer].getName)
props.setProperty("value.serializer",classOf[StringSerializer].getName)

var producer =new KafkaProducer[String,String](props)
var r =new Random()
while(true){ //死循环生成键值对的数据
val data = new ProducerRecord[String,String]("topic_01","key"+r.nextInt(10),"value"+r.nextInt(100))
producer.send(data)
Thread.sleep(1000)
}
producer.close()
}
}

Flink常用API之Kafka的Source_kafka_03


举报

相关推荐

0 条评论