代码出自 尚硅谷大数据, 我学的尚硅谷大数据,自己完成作业,然后照着敲了敲代码,加了一些注释, 把老师讲的话打字打了下来记录了一下, 并且整理了一下 发了个博客
sensor.txt
sensor_1,1547718199,35.8
sensor_6,1547718201,15.4
sensor_7,1547718202,6.7
sensor_10,1547718205,38.1
sensor_1,1547718207,37.2
sensor_1,1547718212,33.5
sensor_1,1547718215,38.1
TableAPI
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.scala._
import org.apache.flink.table.api.{DataTypes, Table}
import org.apache.flink.table.descriptors._
/**
* 读取csv文件,然后tableAPI或者FlinkSql进行统计操作
*/
object TableApiTest5 {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val tableEnv = StreamTableEnvironment.create(env)
val filePath = "D:\\20-Flink\\FlinkTutorial\\src\\main\\resources\\sensor.txt"
tableEnv.connect(new FileSystem().path(filePath))
.withFormat(new Csv()) // 指定csv格式,就是逗号分割的格式,因为kafka输入的数据就是这个格式的.
//创建每个字段的名字和类型
.withSchema(new Schema()
.field("id", DataTypes.STRING())
.field("timestamp", DataTypes.BIGINT())
.field("temperature", DataTypes.DOUBLE())
)
// 创建表名
.createTemporaryTable("inputTable")
// 表的查询
val sensorTable: Table = tableEnv.from("inputTable")
//简单聚合,统计每个传感器温度个数
val aggResultTable: Table = sensorTable
.groupBy('id)
.select('id, 'id.count as 'count) // 根据Id统计
aggResultTable.toRetractStream[(String, Long)].print("tableAPI")
env.execute("table api test job")
}
}
输出结果:
tableAPI> (true,(sensor_1,1))
tableAPI> (true,(sensor_6,1))
tableAPI> (true,(sensor_7,1))
tableAPI> (true,(sensor_10,1))
tableAPI> (false,(sensor_1,1))
tableAPI> (true,(sensor_1,2))
tableAPI> (false,(sensor_1,2))
tableAPI> (true,(sensor_1,3))
tableAPI> (false,(sensor_1,3))
tableAPI> (true,(sensor_1,4))
解读,为什么还有false呢,
我们在流里面没有办法表示流的更改,但是我们可以输出两条信息,false的意思就是作废了,在下面就追加了一条数据,数据就是true了,这样就做了一个更新了,false就是删除操作,删除掉之前的老的数据,然后紧接着追加一行数据,是true,就是新的基于这个id统计的新的数据
因为前面四行的id都是之前没存在的,就打印进来了,而第五行sensor_1这个id和第一行的sensor_1是重复了,就打印了false,把这(sensor_1,1)删除掉,
但是紧接着第六行就打印了true,就是写入了(sensor_1,2),你会发现sensor_1,后面由1变成了2, 这就是追加了新的值的意思.
FlinkSql
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.scala._
import org.apache.flink.table.api.{DataTypes, Table}
import org.apache.flink.table.descriptors._
/**
* 读取csv文件,然后tableAPI或者FlinkSql进行统计操作
*/
object TableApiTest5 {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val tableEnv = StreamTableEnvironment.create(env)
val filePath = "D:\\20-Flink\\FlinkTutorial\\src\\main\\resources\\sensor.txt"
tableEnv.connect(new FileSystem().path(filePath))
.withFormat(new Csv()) // 指定csv格式,就是逗号分割的格式,因为kafka输入的数据就是这个格式的.
//创建每个字段的名字和类型
.withSchema(new Schema()
.field("id", DataTypes.STRING())
.field("timestamp", DataTypes.BIGINT())
.field("temperature", DataTypes.DOUBLE())
)
// 创建表名
.createTemporaryTable("inputTable")
//SQL实现简单聚合
val aggResultSqlTable: Table = tableEnv.
sqlQuery("select id, count(id) as cnt from inputTable group by id")
// 转换成流打印输出
aggResultSqlTable.toRetractStream[(String, Long)].print("agg")
env.execute("table api test job")
}
}
输出结果
agg> (true,(sensor_1,1))
agg> (true,(sensor_6,1))
agg> (true,(sensor_7,1))
agg> (true,(sensor_10,1))
agg> (false,(sensor_1,1))
agg> (true,(sensor_1,2))
agg> (false,(sensor_1,2))
agg> (true,(sensor_1,3))
agg> (false,(sensor_1,3))
agg> (true,(sensor_1,4))
这个输出结果和上面的TableAPI案例是一样的,就不说明了.