Flink SQL 扩展维表 Keyby 功能

阅读 127

2021-09-21

背景

Flink LookupTableSource 通过使用流数据的一列或者多列的值,加载外部存储数据(维表数据),进而完成对流数据的字段扩展。在维表数据不频繁变更的情况下,为提高系统的处理能力,通常将流表数据缓存到TM内存中。

当前,Flink SQL 维表Join 生成的 Operator 数据下发方式为 Forward,意味着每个subTask中缓存着相同的数据,此时缓存命中率较低。如果把维表Join的key作为Hash的条件,这样就能保证下游每一个算子缓存不同的维表数据,从而有效提升缓存命中率。

我们希望,在DDL语句中新增属性信息来控制加载维表数据,是否进行KeyBy功能。当Join多张维表时,根据表对应属性信息,选择是否进行Key操作。

实现流程

基于Flink 1.13.1版本进行扩展,以Join 多张mysql维表为例,完成维表KeyBy功能。

  1. 新增 LookupJoinHashRule 优化规则,添加到FlinkStreamRuleSets#PHYSICAL_REWRITE阶段。

    之所以在 PHYSICAL_REWRITE 阶段添加是因为,Flink对FlinkRelDistribution Trait的处理是创建了
    StreamPhysicalExchange 物理执行节点,我们只需要在形成的物理执行计划的StreamPhysicalLookupJoin 节点前增加 StreamPhysicalExchange 即可。FlinkSQL维表Join形成的物理执行树:


  2. 为 JdbcDynamicTableFactory 新增 lookup.enable_hash 属性信息,进行KeyBy控制。

public static final ConfigOption<String> LOOKUP_ENABLE_HASH =
        ConfigOptions.key("lookup.enable_hash")
                .stringType()
                .defaultValue("false")
                .withDescription("Dimension table  join enable hash.");

  1. 在 CommonPhysicalLookupJoin 新增获取维表 TableIdentifier 的方法。这样才能从CatalogManager中获取表的元数据信息。
CommonPhysicalLookupJoin#getTableIdentifier
def getTableIdentifier():ObjectIdentifier={
    val tableIdentifier: ObjectIdentifier = temporalTable match {
        case t: TableSourceTable => t.tableIdentifier
            case t: LegacyTableSourceTable[_] => t.tableIdentifier
        }
    tableIdentifier
}


LookupJoinHashRule代码:

public class LookupJoinHashRule extends RelOptRule {
    public static LookupJoinHashRule INSTANCE = new LookupJoinHashRule();

    private LookupJoinHashRule() {
        // note: 当前规则仅适用于 StreamPhysicalLookupJoin 节点。
        super(operand(StreamPhysicalLookupJoin.class, any()), "LookupJoinHashRule");
    }

    @Override
    public boolean matches(RelOptRuleCall call) {
        ObjectIdentifier tableIdentifier = ((StreamPhysicalLookupJoin) call.rel(0)).getTableIdentifier();
        CatalogManager catalogManager = call.getPlanner().getContext().unwrap(FlinkContext.class).getCatalogManager();
        CatalogManager.TableLookupResult tableLookupResult = catalogManager.getTable(tableIdentifier).get();
        // note: 读取维表的属性信息
        Map<String, String> options = tableLookupResult.getTable().getOptions();
        String enabledHash = options.getOrDefault(JdbcDynamicTableFactory.LOOKUP_ENABLE_HASH.key(), JdbcDynamicTableFactory.LOOKUP_ENABLE_HASH.defaultValue());
        return BooleanUtils.toBoolean(enabledHash);
    }

    @Override
    public void onMatch(RelOptRuleCall relOptRuleCall) {
        RelNode streamPhysicalLookupJoin = relOptRuleCall.rel(0);
        JoinInfo joinInfo = ((StreamPhysicalLookupJoin) streamPhysicalLookupJoin).joinInfo();
        //note:  构建 FlinkRelDistribution Trait
        FlinkRelDistribution requiredDistribution = FlinkRelDistribution.hash(joinInfo.leftKeys, true);
        //note:  为StreamPhysicalLookupJoin的输入节点新增StreamPhysicalExchange
        RelNode hashInput = FlinkExpandConversionRule.satisfyDistribution(
                FlinkConventions.STREAM_PHYSICAL(),
                ((StreamPhysicalLookupJoin) streamPhysicalLookupJoin).getInput(),
                requiredDistribution
               );
        // note: 使用新的物理执行节点
        relOptRuleCall.transformTo(streamPhysicalLookupJoin.copy(streamPhysicalLookupJoin.getTraitSet(),  Arrays.asList(hashInput)));
    }
}

运行测试

public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        EnvironmentSettings envSettings = EnvironmentSettings.newInstance()
                .useBlinkPlanner()
                .inStreamingMode()
                .build();

        StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env, envSettings);
        tableEnvironment.executeSql("CREATE TABLE kafka_table (\n" +
                "  user_id int,\n" +
                "  order_amount bigint,\n" +
                "  sname String,\n" +
                "  log_ts TIMESTAMP(3),\n" +
                "  proctime as  PROCTIME()" +
                ") WITH (\n" +
                "    'connector' = 'kafka',\n" +
                "    'properties.bootstrap.servers' = 'localhost:9092',\n" +
                "    'properties.kafka.max.poll.records' = '1',\n" +
                "    'properties.max.poll.records ' = '1',\n" +
                "    'topic' = 'mqTest02',\n" +
                "    'format' = 'json',\n" +
                "    'scan.startup.mode' = 'latest-offset'\n" +
                ")");
        // note: 开启HASH 
        tableEnvironment.executeSql("CREATE TABLE jdbc_table2 (\n" +
                "  id int,\n" +
                "  name varchar,\n" +
                "  description STRING,\n" +
                "  catalog STRING\n" +
                ") WITH (\n" +
                "    'connector' = 'jdbc',\n" +
                "    'scan.partition.column' = 'id',\n" +
                "    'scan.partition.num' = '2',\n" +
                "    'lookup.enable_hash' = 'true',\n" +
                "    'scan.partition.lower-bound' = '1',\n" +
                "    'scan.partition.upper-bound' = '1000',\n" +
                "    'url' = 'jdbc:mysql://localhost:3306/mqTest?useUnicode=true&characterEncoding=utf-8',\n" +
                "    'username' = 'root',\n" +
                "    'password' = '123456',\n" +
                "    'table-name' = 'test1'\n" +
                ")");
        // note: 不开启HASH 
        tableEnvironment.executeSql("CREATE TABLE jdbc_table3 (\n" +
                "  id int,\n" +
                "  name varchar,\n" +
                "  description STRING,\n" +
                "  catalog STRING\n" +
                ") WITH (\n" +
                "    'connector' = 'jdbc',\n" +
                "    'scan.partition.column' = 'id',\n" +
                "    'scan.partition.num' = '2',\n" +
                "    'lookup.enable_hash' = 'false',\n" +
                "    'scan.partition.lower-bound' = '1',\n" +
                "    'scan.partition.upper-bound' = '1000',\n" +
                "    'url' = 'jdbc:mysql://localhost:3306/mqTest?useUnicode=true&characterEncoding=utf-8',\n" +
                "    'username' = 'root',\n" +
                "    'password' = '123456',\n" +
                "    'table-name' = 'test2'\n" +
                ")");

        tableEnvironment.executeSql("CREATE TABLE fs_table (\n" +
                "  id bigint,\n" +
                "  name STRING,\n" +
                "  s3Name STRING,\n" +
                "  order_amount bigint,\n" +
                "  description STRING\n" +
                ") WITH (\n" +
                       "'connector' = 'print'" +
                ")");

        tableEnvironment.executeSql("INSERT INTO fs_table select s1.user_id,s2.name,s3.name,s1.order_amount,s2.description " +
                "  from kafka_table s1 " +
                "  join jdbc_table2 FOR SYSTEM_TIME AS OF s1.proctime AS s2 " +
                "       ON s1.user_id=s2.id " +
                "  join jdbc_table3 FOR SYSTEM_TIME AS OF s1.proctime  AS s3 " +
                "       ON s1.user_id=s3.id" +
                "");

}

两张维表都开启Hash操作后,运行在Yarn上的拓扑图:


一张维表开启Hash,一张未开启Hash情况下,运行在Yarn上的拓扑图:


精彩评论(0)

0 0 举报