0
点赞
收藏
分享

微信扫一扫

Flink将数据输出到Kafka的指定topic中

8052cf60ff5c 2022-07-12 阅读 61

sensor.txt

sensor_1,1547718199,35.8
sensor_6,1547718201,15.4
sensor_7,1547718202,6.7
sensor_10,1547718205,38.1
sensor_1,1547718207,37.2
sensor_1,1547718212,33.5
sensor_1,1547718215,38.1

代码

import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.{DataTypes, EnvironmentSettings, Table}
import org.apache.flink.table.api.scala._
import org.apache.flink.table.descriptors.{Csv, FileSystem, Kafka, Schema}

/**
* 从文件里面读取数据,处理完了在输出到Kafka中.
*/
object KafkaTableTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)


val filePath = "D:\\20-Flink\\FlinkTutorial\\src\\main\\resources\\sensor.txt"
tableEnv.connect(new FileSystem().path(filePath))
.withFormat(new Csv()) // 定义读取数据之后的格式化方法
.withSchema(new Schema()
.field("id", DataTypes.STRING())
.field("timestamp", DataTypes.BIGINT())
.field("temperature", DataTypes.DOUBLE())
) // 定义表结构
.createTemporaryTable("inputTable") // 注册一张表


// 做转换操作
// 对Table进行转换操作,得到结果表
val sensorTable: Table = tableEnv.from("inputTable")
val resultTable: Table = sensorTable
.select('id, 'temperature)
.filter('id === "sensor_1")

// 定义一个连接到kafka的输出表
tableEnv.connect(new Kafka()
.version("0.11")
.topic("sinkTest")
.property("bootstrap.servers", "zjj101:9092")
.property("zookeeper.connect", "zjj101:2181,zjj102:2181,zjj103:2181")
)
.withFormat(new Csv())
.withSchema(new Schema()
.field("id", DataTypes.STRING())
.field("temp", DataTypes.DOUBLE())
)
.createTemporaryTable("kafkaOutputTable")
// 将结果表输出
resultTable.insertInto("kafkaOutputTable")


val table = tableEnv.sqlQuery("select id,temp from kafkaOutputTable")
table.toAppendStream[(String, Double)].print("test")
env.execute("kafka table test")
}
}

启动Kafka消费者

[root@zjj101 ~]# $KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server zjj101:9092 --topic  sinkTest

启动Flink程序

IDEA控制台输出:

test> (sensor_1,35.8)
test> (sensor_1,37.2)
test> (sensor_1,33.5)
test> (sensor_1,38.1)

查看消费者控制台输出框

[root@zjj101 ~]# $KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server zjj101:9092 --topic  sinkTest
sensor_1,35.8

sensor_1,37.2

sensor_1,33.5

sensor_1,38.1

可以发现输出出来东西了.


举报

相关推荐

0 条评论