0
点赞
收藏
分享

微信扫一扫

机器学习笔记 - 用于3D点云数据分类的Point Net的训练

穆熙沐 2024-06-20 阅读 27

一、Flink CDC 简介

1. CDC 介绍

​ CDC 是 Change Data Capture(变更数据获取)的简称。核心思想是监测并捕获数据库的变动(包括数据或数据表的插入、更新以及删除等),将这些变更按发生的顺序完整记录下来,写入到消息中间件中以供其他服务进行订阅及消费。

2. CDC 种类

基于查询的 CDC基于 Binlog 的 CDC
开源产品Sqoop、Kafka JDBC SourceCanal、Maxwell、Debezium
执行模式BatchStreaming
是否可以捕获所有数据变化
延迟性高延迟低延迟
是否增加数据库压力

3. Flink CDC 介绍

​ Flink CDC 是一个内置了 Debezium 的基于 Binlog 的可以直接从 MySQL、PostgreSQL 等数据库直接读取全量数据和增量变更数据的 source 组件。开源地址:https://github.com/ververica/flink-cdc-connectors

二、Flink CDC 案例实操

1. DataStream 实现

1.1 导入依赖
<dependencies>
	 <dependency>
		 <groupId>org.apache.flink</groupId>
		 <artifactId>flink-java</artifactId>
		 <version>1.12.0</version>
	 </dependency>
	 <dependency>
		 <groupId>org.apache.flink</groupId>
		 <artifactId>flink-streaming-java_2.12</artifactId>
		 <version>1.12.0</version>
	 </dependency>
	 <dependency>
		 <groupId>org.apache.flink</groupId>
		 <artifactId>flink-clients_2.12</artifactId>
		 <version>1.12.0</version>
	 </dependency>
	 <dependency>
		 <groupId>org.apache.hadoop</groupId>
		 <artifactId>hadoop-client</artifactId>
		 <version>3.1.3</version>
	 </dependency>
	 <dependency>
		 <groupId>mysql</groupId>
		 <artifactId>mysql-connector-java</artifactId>
		 <version>5.1.49</version>
	 </dependency>
	 <dependency>
		 <groupId>org.apache.flink</groupId>
		 <artifactId>flink-table-planner-blink_2.12</artifactId>
		 <version>1.12.0</version>
	 </dependency>
	 <dependency>
		 <groupId>com.ververica</groupId>
		 <artifactId>flink-connector-mysql-cdc</artifactId>
		 <version>2.0.0</version>
	 </dependency>
	 <dependency>
		 <groupId>com.alibaba</groupId>
		 <artifactId>fastjson</artifactId>
		 <version>1.2.75</version>
	 </dependency>
</dependencies>
<build>
	 <plugins>
		 <plugin>
			 <groupId>org.apache.maven.plugins</groupId>
			 <artifactId>maven-assembly-plugin</artifactId>
			 <version>3.0.0</version>
			 <configuration>
				 <descriptorRefs>
				 	<descriptorRef>jar-with-dependencies</descriptorRef>
				 </descriptorRefs>
			 </configuration>
			 <executions>
				 <execution>
					 <id>make-assembly</id>
					 <phase>package</phase>
					 <goals>
					 	<goal>single</goal>
					 </goals>
				 </execution>
			 </executions>
		 </plugin>
	 </plugins>
</build>
1.2 编写程序代码
public class FlinkCDC {
    public static void main(String[] args) throws Exception {
        //1. 创建 Flink 执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        
        //Flink-CDC 将读取 binlog 的位置信息以状态的方式保存在 CK,如果想要做到断点续传,需要从 Checkpoint 或者 Savepoint 启动程序
         //1.1 开启 Checkpoint,每隔 5 秒钟做一次 CK
         env.enableCheckpointing(5000L);
         //1.2 指定 CK 的一致性语义
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
         //1.3 设置任务关闭的时候保留最后一次 CK 数据
 env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
         //1.4 指定从 CK 自动重启策略
         env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 2000L));
         //1.5 设置状态后端
         env.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/flinkCDC"));
         //1.6 设置访问 HDFS 的用户名
         System.setProperty("HADOOP_USER_NAME", "lgb");
        
        //2. 创建 FlinkCDC Source
        /*
        	StartupOptions 有 5 种类型:
        	1. initial:默认,先使用查询的方式读取表中所有的数据,然后再从 binlog 的最近位置监控读取
        	2. earliest:从 binlog 最开始的位置读取,要求在数据库创建之前就开启了 binlog
        	3. latest:从 binlog 的最近位置监控读取
			4. specificOffset:从 binlog 的指定位置读取
			5. timestamp:从 binlog 的指定时间戳读取
        */
        DebeziumSourceFunction<String> mysqlSource = MysqlSource.<String>builder()
            .hostname("hadoop102") //Mysql所在主机名
            .port(3306) //mysql端口号
            .username("root") //登录mysql用户名
            .password("123456") //登录mysql密码
            .databaseList("cdc_test") //监控的数据库列表,可变参数
            .tableList("cdc_test.user_info") //监控的数据表,不指定则监控数据库下所有表
            .deserializer(new StringDebeziumDeserializationSchema()) //反序列化器
            .startupOptions(StartupOptions.initial()) //指定读取策略
            .build();
        
        //3. 通过 FlinkCDC Source 创建 DataStream
        DataStream<String> dataStream = env.addSource(mysqlSource);
        
        //4. 打印输出流
        dataStream.print();
        
        //5. 启动任务
        env.execute("FlinkCDC");
    }
}
1.3 测试
1.3.1 本地测试
  • 开启 MySQL Binlog 并重启 MySQL
  • 在 Mysql 中创建对应的数据库和数据表并插入一条数据
  • 启动 FlinkCDC 程序,查看控制台结果,可以看到通过查询的方式获取到了数据表里的所有数据
  • 在数据表中进行增删改操作,查看程序控制台输出结果
1.3.2 集群测试
  • 将 FlinkCDC 程序进行打包并上传到集群

  • 启动 Hadoop、zookeeper 和 Flink 集群

  • 运行 FlinkCDC 程序

    bin/flink run -c com.atguigu.FlinkCDC flink-1.0-SNAPSHOT-jar-with-dependencies.jar
    
  • 给当前的 Flink 程序创建 Savepoint

    bin/flink savepoint [JobId] hdfs://hadoop102:8020/flink/save
    
    
  • 停止 FlinkCDC 程序

  • 在Mysql数据表中进行增删改操作

  • 从 Savepoint 重启程序查看程序输出结果

    bin/flink run -s hdfs://hadoop102:8020/flink/save/[JobId] -c com.atguigu.FlinkCDC flink-1.0-SNAPSHOT-jar-with-dependencies.jar
    

2. Flink SQL 实现

public class FlinkSQLCDC {
    public static void main(String[] args) throws Exception {
        //1. 创建 Flink 执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        
        //2. 创建 FlinkSQL 表环境
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        
        //3. 配置 FlinkSQLCDC 监控单表(只能监控单表),不需要指定反序列化器,读取模式只有 initial 和 latest-offset
        tableEnv.executeSql(
        	"create table user_info (" +
            "id String primary key, name String, sex String) with (" +
            " 'connector' = 'mysql-cdc'," +
            " 'scan.startup.mode' = 'initial'," +
            " 'hostname' = 'hadoop102'," +
            " 'port' = '3306'," +
            " 'username' = 'root'," +
            " 'password' = '123456'," +
            " 'database-name' = 'cdc_test'," +
            " 'table-name' = 'user_info'" +
            ")"
        );
        
        //4. 查询输出表中数据
        Table table = tableEnv.sqlQuery("select * from user_info");
        DataStream<Tuple2<Boolean, Row>> dataStream = tableEnv.toRetractStream(table, Row.class);
        dataStream.print();
        
        //5. 启动任务
        env.execute("FlinkSqlCDC");
    }
}

3. 自定义反序列化器

/**
	自定义反序列化器:实现 DebeziumDeserializationSchema<T> 接口并实现 deserialize 和 getProducedType 方法 
*/
public class MyDeserializationSchema implements DebeziumDeserializationSchema<String> {
    /*
    	想要展示的数据格式:
    	{
    		"dbName":"",
    		"tableName":"",
    		"before":{"field1":"value1",...},
    		"after":{"field1":"value1",...},
    		"op":""
    	}
    */
    @Override
    public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {
        JSONObject result = new JSONObject();
        
        //1.获取库名和表名
        String topic = sourceRecord.topic();
        String[] fields = topic.split("\\.");
        
        //2. 获取 before 数据
        Struct value = (Struct) sourceRecord.value();
        Struct before = value.getStruct("before");
        JSONObject beforeJSON = new JSONObject();
        if(before != null) {
            Schema schema = before.schema();
            List<Field> fields = schema.fields();
            
            for(Field field : fields) {
                beforeJSON.put(field.name(), before.get(field));
            }
        }
        
        //3. 获取 after 数据
        Struct after = value.getStruct("after");
        JSONObject afterJSON = new JSONObject();
        if(after != null) {
            Schema schema = after.schema();
            List<Field> fields = schema.fields();
            
            for(Field field : fields) {
                afterJSON.put(field.name(), after.get(field));
            }
        }
        
        //4. 获取操作类型 READ DELETE UPDATE CREATE
 		Envelope.Operation operation = Envelope.operationFor(sourceRecord);
        
        result.put("dbName", fields[1]);
        result.put("tableName", fields[2]);
        result.put("before", beforeJSON);
        result.put("after", afterJSON);
        result.put("op", operation);
        
        collcetor.collect(result.toJSONString());
    }
    
    
    @Override
    public TypeInformation<String> getProducedType() {
        return TypeInformation.of(String.class);
    }	
}
举报

相关推荐

0 条评论