Flink 函数
文章目录
相关博客:
Flink-函数 | 用户自定义函数(UDF)标量函数 | 表函数 | 聚合函数 | 表聚合函数
一、Flink Table API 和 SQL 内置函数
Flink Table API 和 SQL为用户提供了一组用于数据转换的内置函数。
SQL中支持的很多函数,Table API 和 SQL都已经做了实现
-  
比较函数
-  
SQL:
value1 = value2
value1 > value2
 -  
Table API
ANY1 === ANY2
ANY1 > ANY2
 
 -  
 -  
逻辑函数
-  
SQL:
boolean1= boolean2
boolean IS FALSE
NOT boolean
boolean IS FALSE
NOT boolean
 -  
Table API
BOOLEAN1 || BOOLEAN2
BOOLEAN.isFalse
!BOOLEAN
 
 -  
 -  
算数函数
-  
SQL:
numeric1 + numeric2
POWER(numeric1, numeric2)
 -  
Table API
NUMERIC1 + NUMERIC2
NUMERIC1.POWER(NUMERIC2)
 
 -  
 -  
字符串函数
-  
SQL:
string1 + string2
UPPER(string)
CHAR_LENGTH(string)
 -  
Table API
STRING1 + STRING2
STRING.upperCase()
STRING.charLength()
 
 -  
 -  
时间函数
-  
SQL:
DATE string
TIMESTAMP string
CURRENT_TIME
interval string range
 -  
Table API
STRING.toDate
STRING.toTimestamp
currentTime()
NUMERIC.days
 
 -  
 -  
聚合函数
-  
SQL:
COUNT()
SUM(expression)
RANK()
ROW_NUMBER()
 -  
Table API
FIELD.count
FIELD.sum
 
 -  
 
二、用户自定义函数(UDF)
用户定义函数(User-defined Functions,UDF)是一个重要的特性,它们显著地扩展了查询的表达能力.
一些系统内置函数无法解决的需求,可以用UDF来自定义实现
在大多数情况下,用户定义的函数必须先注册,然后才能在查询中使用
函数通过调用 registerFunction() 方法在 TableEnvironment 中注册。当用户定义的函数被注册时,它被插入到 TableEnvironment 的函数目录中,这样Table API 或 SQL 解析器就可以识别并正确地解释它。
sql函数有两大类型:
- scalar Function类似于map,一对一
 - Table Function类似与flatMap,一对多
 
2.1 标量函数(Scalar Functions)
定义标量函数,可以将0、1或多个标量值,映射到新的标量值
为了定义标量函数,必须扩展基类ScalarFunction,并实现求值(eval)方法。
标量函数的行为由求值方法决定,求值方法必须public公开声明并命名为eval
例如:
package com.root.udf;
import com.root.SensorReading;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.types.Row;
/**
 * @author Kewei
 * @Date 2022/3/9 15:44
 */
public class UDFTest1_Scalar {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        DataStreamSource<String> inputStream = env.readTextFile("data/sensor.txt");
        SingleOutputStreamOperator<SensorReading> dataStream = inputStream.map(line -> {
            String[] field = line.split(",");
            return new SensorReading(field[0], new Long(field[1]), new Double(field[2]));
        });
        Table table = tableEnv.fromDataStream(dataStream, "id,timestamp as ts,temperature");
        // 自定义标量函数,实现求id的hash值
        HashCode hashCode = new HashCode(23);
        // 注册UDF函数
        tableEnv.registerFunction("hashcode",hashCode);
        // Table API
        Table resultTable = table.select("id, ts, hashcode(id)");
        // SQL
        tableEnv.createTemporaryView("sensor",table);
        Table resultSqlTable = tableEnv.sqlQuery("select id, ts, hashcode(id) from sensor");
        tableEnv.toAppendStream(resultTable, Row.class).print("result");
        tableEnv.toAppendStream(resultSqlTable,Row.class).print("sql");
        env.execute();
    }
    public static class HashCode **extends ScalarFunction**{
        private int factor = 13;
        public HashCode(int factor){
            this.factor = factor;
        }
        public int eval(String id){
            return id.hashCode() * 13;
        }
    }
}
 
2.2 表函数(Table Function)
用户定义的表函数,也可以将0、1或多个标量值作为输入参数,与标量函数不同,它可以返回任意数量的行作为输出,而不是单个值。
为了定义一个表函数,必须扩展TableFunction并实现求值方法。
同样的,表函数的行为由其求值方法决定,求值方法必须是 public 的,并命名为 eval。
例如:
package com.root.udf;
import com.root.SensorReading;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;
/**
 * @author Kewei
 * @Date 2022/3/9 16:01
 */
public class UDFTest2_Table {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        // 自定义表函数,类似于flatMap
        Spilt split = new Spilt("_");
        // 注册表函数
        tableEnv.registerFunction("split",split);
        DataStreamSource<String> inputStream = env.readTextFile("data/sensor.txt");
        SingleOutputStreamOperator<SensorReading> dataStream = inputStream.map(line -> {
            String[] fields = line.split(",");
            return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
        });
        Table table = tableEnv.fromDataStream(dataStream, "id,timestamp as ts,temperature");
        tableEnv.createTemporaryView("sensor",table);
        Table resultTable = table.joinLateral("split(id) as (word, length)").select("id, ts, word, length");
        Table resultSqlTable = tableEnv.sqlQuery("select id, ts, word, length " +
                "from sensor, lateral table(split(id)) as splitid(word, length)");
        tableEnv.toAppendStream(resultTable, Row.class).print("res");
        tableEnv.toAppendStream(resultSqlTable, Row.class).print("sql");
        env.execute();
    }
    
    // 将id分割,并分别返回对应的长度
    public static class Spilt extends TableFunction<Tuple2<String,Integer>>{
        public String sep = ",";
        public Spilt(String sep){
            this.sep = sep;
        }
        public void eval(String str){
            for (String s : str.split(sep)) {
                collect(new Tuple2<>(s,s.length()));
            }
        }
    }
}
 
2.3 聚合函数(Aggregate Function)
聚合,多对一,类似前面的窗口聚合
用户自定义聚合函数(User-Defined Aggregate Functions,UDAGGs)可以把一个表中的数据,聚合成一个标量值。
用户定义的聚合函数,是通过继承 AggregateFunction 抽象类实现的。

- AggregationFunction要求必须实现的方法 
  
- createAccumulator
 - accumulate
 - getValue
 
 - AggregationFunction的工作原理如下 
  
- 首先,它需要一个累加器(Accumulator),用来保存聚合中间结果的数据结构;可以通过调用
createAccumulator()方法初始化累加器。 - 随后,对每一个输入行调用函数的
accumulate()方法来更新累加器。 - 处理完所有行后,将调用
getValue()方法来计算并返回最终结果。 
 - 首先,它需要一个累加器(Accumulator),用来保存聚合中间结果的数据结构;可以通过调用
 
例如:
package com.root.udf;
import com.root.SensorReading;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.functions.AggregateFunction;
import org.apache.flink.types.Row;
/**
 * @author Kewei
 * @Date 2022/3/9 16:23
 */
public class UDFTest3_Aggra {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        DataStreamSource<String> inputStream = env.readTextFile("data/sensor.txt");
        SingleOutputStreamOperator<SensorReading> dataStream = inputStream.map(line -> {
            String[] fields = line.split(",");
            return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
        });
        // 实例一个求平均的函数
        AvgTemp avgTemp = new AvgTemp();
        // 注册一个UDAF
        tableEnv.registerFunction("avgTemp",avgTemp);
        Table table = tableEnv.fromDataStream(dataStream, "id, timestamp as ts, temperature as temp");
        tableEnv.createTemporaryView("sensor",table);
        // 以下两种Table API写法结果相同
        Table result = table.groupBy("id").select("id, avgTemp(temp)");
        Table result2 = table
                .groupBy("id")
                .aggregate("avgTemp(temp) as avgtemp")
                .select("id, avgtemp");
        // sql
        Table resultSql = tableEnv.sqlQuery("select id,avgTemp(temp) from sensor group by id");
        tableEnv.toRetractStream(result2, Row.class).print("res");
        tableEnv.toRetractStream(resultSql,Row.class).print("sql");
        env.execute();
    }
    
    // 计算平均温度
    public static class AvgTemp extends AggregateFunction<Double, Tuple2<Double,Integer>>{
        @Override
        public Double getValue(Tuple2<Double, Integer> value) {
            return value.f0/value.f1;
        }
        @Override
        public Tuple2<Double, Integer> createAccumulator() {
            return new Tuple2<>(0.0,0);
        }
        public void accumulate(Tuple2<Double,Integer> acc, Double value){
            acc.f0 += value;
            acc.f1 += 1;
        }
    }
}
 
2.4 表聚合函数(Table Function)
用户定义的表聚合函数(User-Defined Table Aggregate Functions,UDTAGGs),可以把一个表中数据,聚合为具有多行和多列的结果表。
用户定义表聚合函数,是通过继承 TableAggregateFunction 抽象类来实现的。

-  
AggregationFunction 要求必须实现的方法:
createAccumulator()accumulate()emitValue()
 -  
TableAggregateFunction 的工作原理如下:
- 首先,它同样需要一个累加器(Accumulator),它是保存聚合中间结果的数据结构。通过调用 
createAccumulator()方法可以创建空累加器。 - 随后,对每个输入行调用函数的 
accumulate()方法来更新累加器。 - 处理完所有行后,将调用函数的 
emitValue()方法来计算并返回最终结果。 
例如:
package com.root.udf; import com.root.SensorReading; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.java.StreamTableEnvironment; import org.apache.flink.table.functions.TableAggregateFunction; import org.apache.flink.types.Row; import org.apache.flink.util.Collector; /** * @author Kewei * @Date 2022/3/9 16:40 */ public class UDFTest4_TableAgg { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.setParallelism(1); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); DataStreamSource<String> inputStream = env.readTextFile("data/sensor.txt"); SingleOutputStreamOperator<SensorReading> dataStream = inputStream.map(line -> { String[] fields = line.split(","); return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2])); }) .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<SensorReading>(Time.seconds(1)) { @Override public long extractTimestamp(SensorReading value) { return value.getTimestamp() * 1000L; } }); // 实例一个自定义聚合表函数 MyAggTable myAggTable = new MyAggTable(); // 注册自定义聚合表函数 tableEnv.registerFunction("myAgg",myAggTable); Table table = tableEnv.fromDataStream(dataStream, "id, temperature as temp, timestamp.rowtime as ts"); tableEnv.createTemporaryView("sensor",table); // 使用 Table result = table.groupBy("id") .flatAggregate("myAgg(temp) as (temp, rank)") .select("id, temp, rank"); // 表聚合函数 不支持 sql调用 tableEnv.toRetractStream(result, Row.class).print("res"); env.execute(); } // 创建一个类,保存排名第一和第二的温度 public static class AggTabTempAcc{ public Double highestTemp; public Double secondHighestTemp; public AggTabTempAcc(){ highestTemp = Double.MIN_VALUE; secondHighestTemp = Double.MIN_VALUE; } } // 创建一个TableAggregateFunction函数,用于统计出,同一id排名第一和第二的温度 public static class MyAggTable extends TableAggregateFunction<Tuple2<Double,Integer>,AggTabTempAcc>{ // 初始化累加器 @Override public AggTabTempAcc createAccumulator() { return new AggTabTempAcc(); } // 更新累加器 public void accumulate(AggTabTempAcc acc,Double temp){ if (temp > acc.highestTemp){ acc.secondHighestTemp = acc.highestTemp; acc.highestTemp = temp; } else if (temp > acc.secondHighestTemp){ acc.secondHighestTemp = temp; } } // 输出累加器 public void emitValue(AggTabTempAcc acc, Collector<Tuple2<Double, Integer>> col){ col.collect(new Tuple2<>(acc.highestTemp,1)); col.collect(new Tuple2<>(acc.secondHighestTemp,2)); } } } - 首先,它同样需要一个累加器(Accumulator),它是保存聚合中间结果的数据结构。通过调用 
 









