背景
本文基于 Flink CDC 3.2.0
最近在做了一个实时数据抽取的工作,也就是比较简单的从mysql到Starrocks的工作,其实这种有很多中实现,比如说
- flink SQL
mysql -> starrocks
-
flink jar
写代码直接读取mysql的数据,之后写入到Starrocks
其他的用Spark Streaming
或者Structured Streaming
这种就不说了,毕竟在实时同步这块,Flink
还是占据了很大的优势 -
canal + kafka + starrocks
这里流程还需要中转到kakfa,流程繁琐冗长
但是以上几种方式都是得写SQL或者写代码实现,这种是要一定的专业知识和门槛的,恰好 FLink CDC 提供了一种yaml文件配置化的方式来降低这种门槛,不需要掌握专业知识就能够进行快速的进行数据传输的工作,在此分析一下 Flink CDC的整个流程
分析
Demo
首先先见一下这种快速开发数据传输的Demo:
source:
type: mysql
hostname: rm-bp1rk934iidc3****.mysql.rds.aliyuncs.com
port: 3306
username: ${secret_values.mysqlusername}
password: ${secret_values.mysqlpassword}
tables: order_dw_mysql.\.*
server-id: 5405-5415
sink:
type: starrocks
name: StarRocks Sink
jdbc-url: jdbc:mysql://fe-c-b76b6aa51807****-internal.starrocks.aliyuncs.com:9030
load-url: fe-c-b76b6aa51807****-internal.starrocks.aliyuncs.com:8030
username: ${secret_values.starrocksusername}
password: ${secret_values.starrockspassword}
table.create.properties.replication_num: 1
route:
- source-table: order_dw_mysql.\.*
sink-table: order_dw_sr.<>
replace-symbol: <>
description: route all tables in source_db to sink_db
pipeline:
name: Sync MySQL Database to StarRocks
schema.change.pipeline: EVOLVE
parallelism: 1
只需要配置 source sink pipline 就可以快速的进行数据的传输工作,在flink后台最终形成的物理执行图为:
而且如果库表存在,则不会创建库表,否则会自动创建。
源码分析
首先是入口 CliFrontend
的 main方法:
Options cliOptions = CliFrontendOptions.initializeOptions();
CommandLineParser parser = new DefaultParser();
CommandLine commandLine = parser.parse(cliOptions, args);
// Help message
if (args.length == 0 || commandLine.hasOption(CliFrontendOptions.HELP)) {
HelpFormatter formatter = new HelpFormatter();
formatter.setLeftPadding(4);
formatter.setWidth(80);
formatter.printHelp(" ", cliOptions);
return;
}
// Create executor and execute the pipeline
PipelineExecution.ExecutionInfo result = createExecutor(commandLine).run();
// Print execution result
printExecutionInfo(result);
- 首先是参数解析,解析
bash bin/flink-cdc.sh /path/mysql-to-doris.yaml
所带的参数 - 再者createExecutor方法构建
CliExecutor
,这里主要配置一些Flink的参数等
重要的是run
方法:
PipelineDefinitionParser pipelineDefinitionParser = new YamlPipelineDefinitionParser();
PipelineDef pipelineDef =
pipelineDefinitionParser.parse(pipelineDefPath, globalPipelineConfig);
// Create composer
PipelineComposer composer = getComposer();
// Compose pipeline
PipelineExecution execution = composer.compose(pipelineDef);
// Execute or submit the pipeline
return execution.execute();
这里主要就是解析 yaml文件,以及根据source sink构建 pipeline并生成 filnk DataStream 任务 ,这种方式和 flink SQL的方式很像,只不过 SQL方式 是 解析SQL形成对应的flink DataStream任务
- YamlPipelineDefinitionParser.parser
根据用户的配置,解析 SourceDef/ SinkDef /RouteDefs /TransformDefs/ UdfDef,并组装成PipelineDef
- FlinkPipelineComposer.compose
根据上个步骤生成的PipelineDef
,解析为Flink DataStream 流,这里会根据用户配置不同,最终生成的物理计划也会不同public PipelineExecution compose(PipelineDef pipelineDef) {
int parallelism = pipelineDef.getConfig().get(PipelineOptions.PIPELINE_PARALLELISM);
env.getConfig().setParallelism(parallelism);
SchemaChangeBehavior schemaChangeBehavior =
pipelineDef.getConfig().get(PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR);
// Build Source Operator
DataSourceTranslator sourceTranslator = new DataSourceTranslator();
DataStream<Event> stream =
sourceTranslator.translate(
pipelineDef.getSource(), env, pipelineDef.getConfig(), parallelism);
// Build PreTransformOperator for processing Schema Event
TransformTranslator transformTranslator = new TransformTranslator();
stream =
transformTranslator.translatePreTransform(
stream, pipelineDef.getTransforms(), pipelineDef.getUdfs());
// Schema operator
SchemaOperatorTranslator schemaOperatorTranslator =
new SchemaOperatorTranslator(
schemaChangeBehavior,
pipelineDef.getConfig().get(PipelineOptions.PIPELINE_SCHEMA_OPERATOR_UID),
pipelineDef
.getConfig()
.get(PipelineOptions.PIPELINE_SCHEMA_OPERATOR_RPC_TIMEOUT));
OperatorIDGenerator schemaOperatorIDGenerator =
new OperatorIDGenerator(schemaOperatorTranslator.getSchemaOperatorUid());
// Build PostTransformOperator for processing Data Event
stream =
transformTranslator.translatePostTransform(
stream,
pipelineDef.getTransforms(),
pipelineDef.getConfig().get(PipelineOptions.PIPELINE_LOCAL_TIME_ZONE),
pipelineDef.getUdfs());
// Build DataSink in advance as schema operator requires MetadataApplier
DataSinkTranslator sinkTranslator = new DataSinkTranslator();
DataSink dataSink =
sinkTranslator.createDataSink(pipelineDef.getSink(), pipelineDef.getConfig(), env);
stream =
schemaOperatorTranslator.translate(
stream,
parallelism,
dataSink.getMetadataApplier()
.setAcceptedSchemaEvolutionTypes(
pipelineDef.getSink().getIncludedSchemaEvolutionTypes()),
pipelineDef.getRoute());
// Build Partitioner used to shuffle Event
PartitioningTranslator partitioningTranslator = new PartitioningTranslator();
stream =
partitioningTranslator.translate(
stream,
parallelism,
parallelism,
schemaOperatorIDGenerator.generate(),
dataSink.getDataChangeEventHashFunctionProvider(parallelism));
// Build Sink Operator
sinkTranslator.translate(
pipelineDef.getSink(), stream, dataSink, schemaOperatorIDGenerator.generate());
// Add framework JARs
addFrameworkJars();
return new FlinkPipelineExecution(
env, pipelineDef.getConfig().get(PipelineOptions.PIPELINE_NAME), isBlocking);
}
-
根据在 pipline 配置的parallelism 设置整个任务的并行度
-
DataSourceTranslator 根据 SourceDef,生成
DataStreamSource
这里主要是通过createDataSource 方法,利用JAVA SPI
机制找出DataSourceFactory
的实现类,对应我们这里就是MySqlDataSourceFactory
所以说 这里的source的配置都是可以参考MySqlDataSourceFactory
的配置的 -
TransformTranslator 也是类似,读者自行阅读
-
DataSinkTranslator 这里会根据 SinkDef 生成 DataSink, 这个在后续的 PartitioningTranslator 以及 schemaOperatorTranslator 中都有用到
这里也是利用JAVA SPI
机制找出DataSinkFactory
的实现类,对应我们这里就是StarRocksDataSinkFactory
,所以说这里的配置也是可以参考StarRocksDataSinkFactory
的配置的- 在 schemaOperatorTranslator 中主要根据 getIncludedSchemaEvolutionTypes 方法获取可以变更的事件,这会综合 用户在sink中配置的
exclude.schema.changes
和include.schema.changes
- 在 schemaOperatorTranslator 中主要根据 getIncludedSchemaEvolutionTypes 方法获取可以变更的事件,这会综合 用户在sink中配置的
-
SchemaOperatorTranslator 这里会根据 pipline配置的
schema.change.behavior
值(我们这里是LENIENT
)来确定SchemaRegistryProvider
的行为
这里的链路读者自己去缕,最后会调用到SchemaRegistryRequestHandler.applySchemaChange
方法:private void applySchemaChange(
TableId tableId, List<SchemaChangeEvent> derivedSchemaChangeEvents) {
for (SchemaChangeEvent changeEvent : derivedSchemaChangeEvents) {
if (changeEvent.getType() != SchemaChangeEventType.CREATE_TABLE) {
if (schemaChangeBehavior == SchemaChangeBehavior.IGNORE) {
currentIgnoredSchemaChanges.add(changeEvent);
continue;
}
}
if (!metadataApplier.acceptsSchemaEvolutionType(changeEvent.getType())) {
LOG.info("Ignored schema change {} to table {}.", changeEvent, tableId);
currentIgnoredSchemaChanges.add(changeEvent);
} else {
try {
metadataApplier.applySchemaChange(changeEvent);
schemaManager.applyEvolvedSchemaChange(changeEvent);
currentFinishedSchemaChanges.add(changeEvent);
} catch (Throwable t) {
...
if (!shouldIgnoreException(t)) {
currentChangeException = t;
break;
} else {
...
}
}
}
...
}
对于 applyCreateTable 这里会有 如果库表存在就会不创建的判断:- 如果传递下来的事件不是`create.table`,且`schema.change.behavior` 配置的是 IGNORE,则忽略发送事件
- 如果下游(也就是sink)不能够接受事件类型,也忽略发送该事件
对应到我们的sink是`StarRocksMetadataApplier`:
```
public Set<SchemaChangeEventType> getSupportedSchemaEvolutionTypes() {
return Sets.newHashSet(
SchemaChangeEventType.CREATE_TABLE,
SchemaChangeEventType.ADD_COLUMN,
SchemaChangeEventType.DROP_COLUMN);
```
- 否则 就会进行应用:
```
if (schemaChangeEvent instanceof CreateTableEvent) {
applyCreateTable((CreateTableEvent) schemaChangeEvent);
} else if (schemaChangeEvent instanceof AddColumnEvent) {
applyAddColumn((AddColumnEvent) schemaChangeEvent);
} else if (schemaChangeEvent instanceof DropColumnEvent) {
applyDropColumn((DropColumnEvent) schemaChangeEvent);
} else if (schemaChangeEvent instanceof RenameColumnEvent) {
applyRenameColumn((RenameColumnEvent) schemaChangeEvent);
} else if (schemaChangeEvent instanceof AlterColumnTypeEvent) {
applyAlterColumn((AlterColumnTypeEvent) schemaChangeEvent);
} else {
throw new UnsupportedSchemaChangeEventException(schemaChangeEvent);
}if (!catalog.databaseExists(starRocksTable.getDatabaseName())) {
catalog.createDatabase(starRocksTable.getDatabaseName(), true);
}
try {
catalog.createTable(starRocksTable, true);// 这里的true就是 ignoreIfExists 为 true- 组装成 FlinkPipelineExecution返回
-
- PipelineExecution.execute 运行flink 任务
这和以DataFrame api写Flink jar任务一样,调用env.executeAsync
方法运行.