0
点赞
收藏
分享

微信扫一扫

Flink_DataStreamAPI_执行环境

一ke大白菜 2024-11-18 阅读 19
flinkcdc

背景

本文基于 Flink CDC 3.2.0
最近在做了一个实时数据抽取的工作,也就是比较简单的从mysql到Starrocks的工作,其实这种有很多中实现,比如说

  1. flink SQL
mysql -> starrocks
  1. flink jar
    写代码直接读取mysql的数据,之后写入到Starrocks
    其他的用 Spark Streaming 或者 Structured Streaming这种就不说了,毕竟在实时同步这块,Flink 还是占据了很大的优势

  2. canal + kafka + starrocks
    这里流程还需要中转到kakfa,流程繁琐冗长

但是以上几种方式都是得写SQL或者写代码实现,这种是要一定的专业知识和门槛的,恰好 FLink CDC 提供了一种yaml文件配置化的方式来降低这种门槛,不需要掌握专业知识就能够进行快速的进行数据传输的工作,在此分析一下 Flink CDC的整个流程

分析

Demo

首先先见一下这种快速开发数据传输的Demo:

source:
type: mysql
hostname: rm-bp1rk934iidc3****.mysql.rds.aliyuncs.com
port: 3306
username: ${secret_values.mysqlusername}
password: ${secret_values.mysqlpassword}
tables: order_dw_mysql.\.*
server-id: 5405-5415

sink:
type: starrocks
name: StarRocks Sink
jdbc-url: jdbc:mysql://fe-c-b76b6aa51807****-internal.starrocks.aliyuncs.com:9030
load-url: fe-c-b76b6aa51807****-internal.starrocks.aliyuncs.com:8030
username: ${secret_values.starrocksusername}
password: ${secret_values.starrockspassword}
table.create.properties.replication_num: 1

route:
- source-table: order_dw_mysql.\.*
sink-table: order_dw_sr.<>
replace-symbol: <>
description: route all tables in source_db to sink_db

pipeline:
name: Sync MySQL Database to StarRocks
schema.change.pipeline: EVOLVE
parallelism: 1

只需要配置 source sink pipline 就可以快速的进行数据的传输工作,在flink后台最终形成的物理执行图为:
在这里插入图片描述

而且如果库表存在,则不会创建库表,否则会自动创建。

源码分析

首先是入口 CliFrontend的 main方法:

        Options cliOptions = CliFrontendOptions.initializeOptions();
CommandLineParser parser = new DefaultParser();
CommandLine commandLine = parser.parse(cliOptions, args);

// Help message
if (args.length == 0 || commandLine.hasOption(CliFrontendOptions.HELP)) {
HelpFormatter formatter = new HelpFormatter();
formatter.setLeftPadding(4);
formatter.setWidth(80);
formatter.printHelp(" ", cliOptions);
return;
}

// Create executor and execute the pipeline
PipelineExecution.ExecutionInfo result = createExecutor(commandLine).run();

// Print execution result
printExecutionInfo(result);

  • 首先是参数解析,解析 bash bin/flink-cdc.sh /path/mysql-to-doris.yaml 所带的参数
  • 再者createExecutor方法构建CliExecutor,这里主要配置一些Flink的参数等

重要的是run方法:

            PipelineDefinitionParser pipelineDefinitionParser = new YamlPipelineDefinitionParser();
PipelineDef pipelineDef =
pipelineDefinitionParser.parse(pipelineDefPath, globalPipelineConfig);
// Create composer
PipelineComposer composer = getComposer();
// Compose pipeline
PipelineExecution execution = composer.compose(pipelineDef);
// Execute or submit the pipeline
return execution.execute();

这里主要就是解析 yaml文件,以及根据source sink构建 pipeline并生成 filnk DataStream 任务 ,这种方式和 flink SQL的方式很像,只不过 SQL方式 是 解析SQL形成对应的flink DataStream任务

  • YamlPipelineDefinitionParser.parser
    根据用户的配置,解析 SourceDef/ SinkDef /RouteDefs /TransformDefs/ UdfDef,并组装成PipelineDef
  • FlinkPipelineComposer.compose
    根据上个步骤生成的PipelineDef,解析为Flink DataStream 流,这里会根据用户配置不同,最终生成的物理计划也会不同
         public PipelineExecution compose(PipelineDef pipelineDef) {
    int parallelism = pipelineDef.getConfig().get(PipelineOptions.PIPELINE_PARALLELISM);
    env.getConfig().setParallelism(parallelism);

    SchemaChangeBehavior schemaChangeBehavior =
    pipelineDef.getConfig().get(PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR);

    // Build Source Operator
    DataSourceTranslator sourceTranslator = new DataSourceTranslator();
    DataStream<Event> stream =
    sourceTranslator.translate(
    pipelineDef.getSource(), env, pipelineDef.getConfig(), parallelism);

    // Build PreTransformOperator for processing Schema Event
    TransformTranslator transformTranslator = new TransformTranslator();
    stream =
    transformTranslator.translatePreTransform(
    stream, pipelineDef.getTransforms(), pipelineDef.getUdfs());

    // Schema operator
    SchemaOperatorTranslator schemaOperatorTranslator =
    new SchemaOperatorTranslator(
    schemaChangeBehavior,
    pipelineDef.getConfig().get(PipelineOptions.PIPELINE_SCHEMA_OPERATOR_UID),
    pipelineDef
    .getConfig()
    .get(PipelineOptions.PIPELINE_SCHEMA_OPERATOR_RPC_TIMEOUT));
    OperatorIDGenerator schemaOperatorIDGenerator =
    new OperatorIDGenerator(schemaOperatorTranslator.getSchemaOperatorUid());

    // Build PostTransformOperator for processing Data Event
    stream =
    transformTranslator.translatePostTransform(
    stream,
    pipelineDef.getTransforms(),
    pipelineDef.getConfig().get(PipelineOptions.PIPELINE_LOCAL_TIME_ZONE),
    pipelineDef.getUdfs());

    // Build DataSink in advance as schema operator requires MetadataApplier
    DataSinkTranslator sinkTranslator = new DataSinkTranslator();
    DataSink dataSink =
    sinkTranslator.createDataSink(pipelineDef.getSink(), pipelineDef.getConfig(), env);

    stream =
    schemaOperatorTranslator.translate(
    stream,
    parallelism,
    dataSink.getMetadataApplier()
    .setAcceptedSchemaEvolutionTypes(
    pipelineDef.getSink().getIncludedSchemaEvolutionTypes()),
    pipelineDef.getRoute());

    // Build Partitioner used to shuffle Event
    PartitioningTranslator partitioningTranslator = new PartitioningTranslator();
    stream =
    partitioningTranslator.translate(
    stream,
    parallelism,
    parallelism,
    schemaOperatorIDGenerator.generate(),
    dataSink.getDataChangeEventHashFunctionProvider(parallelism));

    // Build Sink Operator
    sinkTranslator.translate(
    pipelineDef.getSink(), stream, dataSink, schemaOperatorIDGenerator.generate());

    // Add framework JARs
    addFrameworkJars();

    return new FlinkPipelineExecution(
    env, pipelineDef.getConfig().get(PipelineOptions.PIPELINE_NAME), isBlocking);
    }


    • 根据在 pipline 配置的parallelism 设置整个任务的并行度

    • DataSourceTranslator 根据 SourceDef,生成 DataStreamSource
      这里主要是通过createDataSource 方法,利用JAVA SPI机制找出 DataSourceFactory的实现类,对应我们这里就是MySqlDataSourceFactory
      所以说 这里的source的配置都是可以参考MySqlDataSourceFactory的配置的

    • TransformTranslator 也是类似,读者自行阅读

    • DataSinkTranslator 这里会根据 SinkDef 生成 DataSink, 这个在后续的 PartitioningTranslator 以及 schemaOperatorTranslator 中都有用到
      这里也是利用JAVA SPI机制找出 DataSinkFactory的实现类,对应我们这里就是StarRocksDataSinkFactory,所以说这里的配置也是可以参考StarRocksDataSinkFactory的配置的

      • 在 schemaOperatorTranslator 中主要根据 getIncludedSchemaEvolutionTypes 方法获取可以变更的事件,这会综合 用户在sink中配置的
        exclude.schema.changesinclude.schema.changes
    • SchemaOperatorTranslator 这里会根据 pipline配置的 schema.change.behavior 值(我们这里是 LENIENT)来确定SchemaRegistryProvider的行为
      这里的链路读者自己去缕,最后会调用到 SchemaRegistryRequestHandler.applySchemaChange方法:

      private void applySchemaChange(
      TableId tableId, List<SchemaChangeEvent> derivedSchemaChangeEvents
      )
      {
      for (SchemaChangeEvent changeEvent : derivedSchemaChangeEvents) {
      if (changeEvent.getType() != SchemaChangeEventType.CREATE_TABLE) {
      if (schemaChangeBehavior == SchemaChangeBehavior.IGNORE) {
      currentIgnoredSchemaChanges.add(changeEvent);
      continue;
      }
      }
      if (!metadataApplier.acceptsSchemaEvolutionType(changeEvent.getType())) {
      LOG.info("Ignored schema change {} to table {}.", changeEvent, tableId);
      currentIgnoredSchemaChanges.add(changeEvent);
      } else {
      try {
      metadataApplier.applySchemaChange(changeEvent);
      schemaManager.applyEvolvedSchemaChange(changeEvent);
      currentFinishedSchemaChanges.add(changeEvent);
      } catch (Throwable t) {
      ...
      if (!shouldIgnoreException(t)) {
      currentChangeException = t;
      break;
      } else {
      ...
      }
      }
      }
      ...
      }
    }
    -  如果传递下来的事件不是`create.table`,且`schema.change.behavior` 配置的是 IGNORE,则忽略发送事件
    - 如果下游(也就是sink)不能够接受事件类型,也忽略发送该事件
    对应到我们的sink是`StarRocksMetadataApplier`:
    ```
    public Set<SchemaChangeEventType> getSupportedSchemaEvolutionTypes() {
    return Sets.newHashSet(
    SchemaChangeEventType.CREATE_TABLE,
    SchemaChangeEventType.ADD_COLUMN,
    SchemaChangeEventType.DROP_COLUMN);
    ```
    - 否则 就会进行应用:
    ```
    if (schemaChangeEvent instanceof CreateTableEvent) {
    applyCreateTable((CreateTableEvent) schemaChangeEvent);
    } else if (schemaChangeEvent instanceof AddColumnEvent) {
    applyAddColumn((AddColumnEvent) schemaChangeEvent);
    } else if (schemaChangeEvent instanceof DropColumnEvent) {
    applyDropColumn((DropColumnEvent) schemaChangeEvent);
    } else if (schemaChangeEvent instanceof RenameColumnEvent) {
    applyRenameColumn((RenameColumnEvent) schemaChangeEvent);
    } else if (schemaChangeEvent instanceof AlterColumnTypeEvent) {
    applyAlterColumn((AlterColumnTypeEvent) schemaChangeEvent);
    } else {
    throw new UnsupportedSchemaChangeEventException(schemaChangeEvent);
    }

    对于 applyCreateTable 这里会有 如果库表存在就会不创建的判断:
       if (!catalog.databaseExists(starRocksTable.getDatabaseName())) {
    catalog.createDatabase(starRocksTable.getDatabaseName(), true);
    }

    try {
    catalog.createTable(starRocksTable, true);// 这里的true就是 ignoreIfExists 为 true
    • 组装成 FlinkPipelineExecution返回
  • PipelineExecution.execute 运行flink 任务
    这和以DataFrame api写Flink jar任务一样,调用env.executeAsync方法运行.
举报

相关推荐

0 条评论