0
点赞
收藏
分享

微信扫一扫

TableAPI和SQL之Table和SQL混用

SQL 作为 Flink 中提供的接口之一,占据着非常重要的地位,主要是因为 SQL 具有灵活 和丰富的语法,能够应用于大部分的计算场景。Flink SQL 底层使用 Apache Calcite 框架, 将标准的 Flink SQL 语句解析并转换成底层的算子处理逻辑,并在转换过程中基于语法规则 层面进行性能优化,比如谓词下推等。另外用户在使用 SQL 编写 Flink 应用时,能够屏蔽底 层技术细节,能够更加方便且高效地通过SQL语句来构建Flink应用。Flink SQL构建在Table API 之上,并含盖了大部分的 Table API 功能特性。同时 Flink SQL 可以和 Table API 混用, Flink 最终会在整体上将代码合并在同一套代码逻辑中

数据源

station_0,18600007253,18900002970,barring,1640516925858,12
station_9,18600002949,18900000315,barring,1640516925859,18
station_6,18600003061,18900008200,success,1640516925859,8
station_9,18600001600,18900005953,basy,1640516925860,15
station_9,18600006517,18900008415,basy,1640516925860,5
station_7,18600004062,18900006955,success,1640516925860,14
station_2,18600006911,18900009173,fail,1640516925860,4
station_6,18600002917,18900002111,success,1640516925860,1
station_2,18600007169,18900001408,fail,1640516925860,17
station_7,18600007118,18900008130,basy,1640516927882,7
station_8,18600005437,18900009670,success,1640516927882,19
station_7,18600009499,18900000353,fail,1640516927882,12
station_0,18600004698,18900005849,success,1640516927882,1
station_2,18600001487,18900006475,fail,1640516927882,10
station_2,18600004219,18900000080,barring,1640516927883,18
station_3,18600002694,18900005832,barring,1640516927884,3
station_2,18600001469,18900001268,barring,1640516927884,10
station_9,18600004173,18900005859,success,1640516927884,1
station_2,18600006412,18900008667,basy,1640516927884,11
station_2,18600002536,18900006931,basy,1640516929887,7
station_0,18600003513,18900005514,fail,1640516929887,12
station_0,18600009400,18900001255,basy,1640516929888,18
station_0,18600004698,18900005849,success,1640516927882,1
station_0,18600007253,18900002970,barring,1640516925858,12
station_8,18600007699,18900003716,basy,1577080459130,3
package tablesql

import org.apache.flink.api.scala.typeutils.Types
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.{EnvironmentSettings, Table}
import org.apache.flink.table.api.scala.StreamTableEnvironment
import org.apache.flink.table.sources.CsvTableSource
import org.apache.flink.types.Row

/**
* @Author yqq
* @Date 2021/12/28 21:01
* @Version 1.0
*/
object TestSQLByDurationCount {
//统计每个基站中,通话成功的通话总时长
def main(args: Array[String]): Unit = {
val streaEnv = StreamExecutionEnvironment.getExecutionEnvironment
val settings = EnvironmentSettings.newInstance().inStreamingMode().inStreamingMode().build()
//定义采用EventTime作为时间语义
streaEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
streaEnv.setParallelism(1)
val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(streaEnv, settings)
//读取数据
//读取数据源
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.scala._

val source: CsvTableSource = new CsvTableSource("data/statefile.log",
Array[String]("sid", "call_out", "call_input", "call_type", "call_time", "duration"),
Array(Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.LONG, Types.LONG)
)
//使用纯粹的sql
//注册表
// tableEnv.registerTableSource("t_station_log",source)
// val result: Table = tableEnv.sqlQuery(
// "select sid,sum(duration) as dc from t_station_log where call_type='success' group by sid")
//Table和sql混用
val table: Table = tableEnv.fromTableSource(source)
//执行sql
val result: Table = tableEnv.sqlQuery( //s:替换的意思
s"select sid,sum(duration) as dc from $table where call_type='success' group by sid")
//打印结果
tableEnv.toRetractStream[Row](result)
.filter(_._1==true)
.print()
tableEnv.execute("sql")
}
}

TableAPI和SQL之Table和SQL混用_apache


举报

相关推荐

0 条评论