0
点赞
收藏
分享

微信扫一扫

Flink实时数仓项目—ODS层业务数据到DWD层

小禹说财 2022-04-14 阅读 48

Flink实时数仓项目—ODS层业务数据到DWD层


前言

前面已经将日志数据和业务数据采集到了Kafka中,Kafka中的ods_xx主题就作为了实时数仓的ODS层。同时,已经完成了将ODS层的日志数据处理完放到了DWD层,下面进行对ODS层业务数据的处理。

之前,我是使用了Flink-CDC将业务数据采集到了Kafka中,把所有的数据写入了同一个主题,但是这些数据包括事实数据、也包括维度数据,使用起来非常不方便。

因此。本层的目的是将ODS层的业务数据进行处理,将维度数据保存到HBase中,将事实数据写回Kafka中作为业务数据的DWD层。


一、ODS层业务数据处理

1.ETL过滤空值数据

首先,我们从Flink-CDC采集到的业务数据的格式不一定完全正确的,数据也不一定是正常的,有些数据是没有必要的(例如Delete类型的数据),所以要做一个简单的过滤,把空值筛选掉。

2.实现动态分流功能

因为Flink-CDC是把全部数据统一写入一个topic中,很明显不方便未来的使用,因此需要把各个表拆开进行处理,相当于做了一个分流操作,把不同的表放到了不同的流里。
在实时计算中一般把维度数据写入存储容器,这样是为了方便通过主键查询,速度较快,比如有:HBase、Redis、MySQL等等。一般把事实数据写入流中,进一步处理,最终形成宽表。
但是会出现一些问题:数据库中的表可能是不完善的,在后面可能会新建表,这样的话,如果我们把表的配置写死,后面很难进行扩展,只能做保存点停止程序,然后修改程序,再重新启动,非常的麻烦。因此需要一种动态配置的方案,把这种配置长期保存下来,一旦配置有变化,实时计算就可以自动感知。

二、功能实现

1.ETL空值过滤

首先,要从Kafka的ods_base_db这个主题中读取数据,然后将读取到的String类型的数据转化为JSON对象类型,然后获取到里面的operation类型,使用filter筛选掉操作类型为delete的数据,实现如下:

    //2、消费Kafka的ods_base_db主题中的数据
    String topic="ods_base_db";
    String groupId="ods_db_app";
    DataStreamSource<String> kafkaDataStream = env.addSource(MyKafkaUtil.getKafkaSource(topic, groupId));

    //3、将每行数据转换为JSON对象并过滤掉delete数据   主流
    SingleOutputStreamOperator<JSONObject> jsonObjDataStream = kafkaDataStream.map(data -> JSON.parseObject(data))
            .filter(new FilterFunction<JSONObject>() {
                @Override
                public boolean filter(JSONObject jsonObject) throws Exception {
                    //获取对应的操作类型,前面封装的名称是operation
                    String operation = jsonObject.getString("operation");
                    return !"delete".equals(operation);
                }
            });

2.维度数据存储的选择

共有三种选择:HBase、Redis和MySQL,三者其实查询效率都挺高的,都可以通过索引加快查询。
最主要的影响选择的因素:数据量的大小
因为维度数据中某些数据量还是比较大的,比如user表,所以从数据量上来说可以选择HBase和MySQL,Redis就不太合适了(其实也可以将大表放到HBase中,小表放到Redis中,但是太麻烦了,所以不适用)。
但是如果存储到MySQL中,在用到维度数据时,需要到MySQL中查询数据,但是java后端一直在使用MySQL,再让维度表去MySQL中查询数据,会增加MySQL数据库的压力。
所以,综上可以选择存储到HBase中。

3.动态分流再分析

2.1 思路一(Pass)

首先,一种思路就是根据表名,写对应个数的测输出流,业务数据中一共有46张表,那么就需要写46个动态分流…不敢想象,这种方法最容易想到,但是不到万不得已肯定不要去使用。
另外,可能远远不止46张表。例如order_info这张表,它是既会新增又会修改的表,如果我们要求GMV,那我们只需要新增的数据,还要进行过滤;如果我们要变化的数据,那么也需要过滤,因此还得把这张表再次进行拆分。
最后,如果业务数据库新增了一张表,我们写的程序没有写这张表的测输出流,只能进行保存点保存,停掉程序,然后修改程序,然后再重启。
很明显,这样做很麻烦…

2.2 思路二

采用动态配置的方案,一旦配置发生了变化,实时计算可以自动感知,不需要去修改程序。
这又有三种方案可以选择:
1)使用zookeeper存储,通过Watch感知数据变化(zookeeper的方式写的很少,所以不使用)。

2)使用mysql数据库存储,周期性的同步。
在mysql中维护这样一张表:
在这里插入图片描述
一列是业务数据库的表名,对应一列是这张表应该发送到的业务数据库的主题.
比如,此时新建了一张表D,那么这张表也会新增一行:
在这里插入图片描述
在Flink程序中使用定时器定期去mysql中查询这样的配置信息,就可以做到配置变化的自动感知。

问题:可能会出现丢数据的问题?
问题描述:比如刚扫描完配置文件,建了一张新表,在下次扫描前,这个时间段内的数据可能会丢失。
解决办法:跟后端业务人员商量好,先在配置文件中进行修改,确保Flink程序已经扫描过了配置文件后,再去新建表,这样就不会丢数据。

3)使用mysql数据库存储,使用广播流。
如下图:
在这里插入图片描述
我们使用MySQL存储相关的配置信息,然后用Flink-CDC监控这个表,再将采集到的数据作为一个广播流(因为主流多个并行度,每个并行度都要收到这样的配置文件),用主流和广播流进行连接。这样,当配置文件新增一条信息时,广播流也会实时输出这条信息,主流就会知道发生了变化,就可以根据最新的信息做相应的处理。
综上所述,采用第三种方法最为合适。

4.思路二实现

分流操作是在ETL之后的操作,所以当前数据格式为JSONObject类型。

4.1 配置表字段的确定

我们需要通过配置表来获取到mysql某张表的数据要放到HBase或Kafka中的哪个地方,因此需要对表中的字段进行具体的分析:
要知道采集到的数据是来源于哪张表,所以要有——来源表这个字段。
因为前面提到了新增和变化的数据要区分开,所以要有——操作类型这个字段。
因为维度表放在HBase中,事实表放在Kafka中,所以要有——输出类型这个字段。
对应的,写入HBase中需要一个表名,写入Kafka中需要一个主题,所以要有——输出表名主题名)这个字段。
将数据写入到Kafka中是可以自动创建topic的,但是使用Phoenix往HBase中写入数据时,如果表不存在,会发生异常,所以我们需要自己去建表,对应的,需要——建表所需的列名这个字段。
Phoenix中建表还要求必须有主键,所以需要——主键这个字段。
在建MySQL中表的时候,在建表语句后可能还有一些别的扩展字段,例如指明字符编码等,在Phoenix中也一样,所以需要——扩展语句这个字段。
综上所述,建表语句如下:

CREATE TABLE `table_process` (
	`source_table` varchar(200) NOT NULL COMMENT '来源表',
	`operate_type` varchar(200) NOT NULL COMMENT '操作类型 insert,update,delete',
	`sink_type` varchar(200) DEFAULT NULL COMMENT '输出类型 hbase kafka',
	`sink_table` varchar(200) DEFAULT NULL COMMENT '输出表(主题)',
	`sink_columns` varchar(2000) DEFAULT NULL COMMENT '输出字段',
	`sink_pk` varchar(200) DEFAULT NULL COMMENT '主键字段',
	`sink_extend` varchar(200) DEFAULT NULL COMMENT '建表扩展',
	PRIMARY KEY (`source_table`,`operate_type`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8

4.2 TableProcess实体类的创建

@Data
public class TableProcess {
    //动态分流Sink常量
    public static final String SINK_TYPE_HBASE="hbase";
    public static final String SINK_TYPE_KAFKA="kafka";
    public static final String SINK_TYPE_CK="clickhouse";

    //来源表
    String sourceTable;

    //操作类型insert、update、delete
    String operateType;

    //输出类型hbase、kafka
    String sinkType;

    //输出表(主题)
    String sinkTable;

    //输出字段
    String sinkColumns;

    //主键字段
    String sinkPk;

    //建表扩展
    String sinkExtend;

}

4.3 Flink-CDC处理配置表

我们需要使用Flink-CDC来采集MySQL中配置文件的那张表,然后转化为广播流,实现如下:

    //4、使用Flink-CDC监控配置表,并处理成广播流
    DebeziumSourceFunction<String> sourceFunction = MySqlSource.<String>builder()
            .hostname("hadoop102")
            .port(3306)
            .username("root")
            .password("000000")
            .databaseList("gmall2022-realtime")
            .tableList("gmall2022-realtime.table_process")
            .startupOptions(StartupOptions.initial())
            .deserializer(new MyDeserializationSchema())
            .build();
    DataStreamSource<String> tableProcessStrDataStream = env.addSource(sourceFunction);
    MapStateDescriptor mapStateDescriptor = new MapStateDescriptor<String, TableProcess>("table-process-state",String.class,TableProcess.class);
    BroadcastStream<String> broadcastStream = tableProcessStrDataStream.broadcast(mapStateDescriptor);

广播状态是MapStateDescriptor<String, TableProcess>类型的,key为String类型,Value是对应配置表中的一条数据(映射成了对应的实体类)。

4.4 Phoenix配置类

因为后面建表要用Phoenix,所以需要它的一些配置信息,这里直接定义一个配置类,方便使用:

public class GmallConfig {
    //Phoenix 库名
    public static final String HBASE_SCHEMA = "GMALL2022_REALTIME";
    //Phoenix 驱动
    public static final String PHOENIX_DRIVER = "org.apache.phoenix.jdbc.PhoenixDriver";
    //Phoenix 连接参数
    public static final String PHOENIX_SERVER = "jdbc:phoenix:hadoop102,hadoop103,hadoop104:2181";
}

4.5 处理广播数据

    //6、处理数据:  广播流数据,主流数据(根据广播流数据进行处理)
    //这里把HBase的数据放到了测输出流
    OutputTag<JSONObject> hbaseTag = new OutputTag<JSONObject>("hbase-tag") {
    };
    SingleOutputStreamOperator<JSONObject> kafka = connectedStream.process(new TableProcessFunction(hbaseTag, mapStateDescriptor));

具体实现思路分析:
1)DataStream和broadcastStream连接后,传入一个自定义的实现了BroadcastProcessFunction类的方法,里面分别实现了对主流数据的处理和广播流数据的处理。
2)对于广播流数据的处理,因为广播流中的数据是配置信息,这些配置信息一般情况是只有新增的数据的(代表了新建了一张表),为了保险起见可以判断它的after不为null。
3)我们首先拿到对应数据的after的值,把它转化为TableProcess类型,根据它的输出类型,如果是HBase类型,就去建表;然后直接把广播流中的数据添加到广播状态里。广播状态的key有一定要求:组成key的字段必须都存在于每条业务数据中,且一个key能够唯一识别一个配置信息,所以应该采用tableName+operation作为key。代码如下:

    //value:{"database":"","tableName":"","before":{},"after":{},"operation":""}
    //广播流中每来一条数据,就代表了新增了一张表
    //那么,我们就要先去创建这张表,要不然数据没地方放(建表的时候要注意判断表是否存在)
    //创建完成之后,把这张表的信息存放到广播状态里
    //配置表中一般只有新增的数据,除非写错了,要修改或者删除
    //对于修改和新增,我们拿到after里的数据就是最终的数据
    //对于删除类型,要判断after不为null
    @Override
    public void processBroadcastElement(String s, BroadcastProcessFunction<JSONObject, String, JSONObject>.Context context, Collector<JSONObject> collector) throws Exception {
        //1、解析新增加的一条数据,获取它的after字段的值
        JSONObject jsonObject = JSON.parseObject(s);
        JSONObject after = jsonObject.getJSONObject("after");
        if(after==null||after.size()==0){
            return;
        }
        //把after中的数据映射为TableProcess这个实体类对象
        TableProcess tableProcess = JSON.parseObject(after.toString(), TableProcess.class);

        //2、如果输出类型为HBase,那么就去建表
        if(TableProcess.SINK_TYPE_HBASE.equals(tableProcess.getSinkType())){
            //建表需要表名、字段名、主键、扩展语句
            checkTable(tableProcess.getSinkTable(),
                    tableProcess.getSinkColumns(),
                    tableProcess.getSinkPk(),
                    tableProcess.getSinkExtend());
        }

        //3、写入状态,广播出去
        //先获取广播状态
        BroadcastState<String, TableProcess> broadcastState = context.getBroadcastState(mapStateDescriptor);
        //将本条数据放到广播状态中
        //我们要使用 表名+操作 来唯一确定一个状态,因为一个表的insert操作数据和update操作数据需要放到不同的主题中
        //使用表名一个字段无法唯一确定一条数据
        //同时,我们要确保组成这个key中的字段在主流中也要有,否则,主流没办法拼成这样的key来获取对应的value
        //即key的数据即要在主流中有,也要在配置表中有
        String key=tableProcess.getSourceTable()+"-"+tableProcess.getOperateType();
        broadcastState.put(key,tableProcess);
    }

4)对应的建HBase表的函数,需要表名、列名、主键、扩展语句四个参数。建表语句需要一段一段拼接起来,没办法直接去写,因为需要把列名拆分开(添加列的时候主键的添加格式和其他不一样,需要注意)。其中主键可能为空,那就设置为id;扩展语句也可能为null,就把它设置为空串""。实现如下:

    //执行建表操作
    //建表语句: create table if not exists database.tableName(id varchar primary key,tm_name varchar) xxx;
    private void checkTable(String sinkTable, String sinkColumns, String sinkPk, String sinkExtend) throws SQLException {
        //sinkPk有时候也是不写的,不写的时候默认就修改为id
        if(sinkPk==null){
            sinkPk="id";
        }
        //语句中是允许没有扩展字段的,如果没有就是null,但是不能拼接null,所以要修改为空字符串""
        if(sinkExtend==null){
            sinkExtend="";
        }

        //用stringBuffer来拼接建表语句
        StringBuffer createTableSQL = new StringBuffer();
        createTableSQL.append("create table if not exists ")
                .append(GmallConfig.HBASE_SCHEMA)
                .append(".")
                .append(sinkTable)
                .append("(");

        //将列名分隔开,依次添加到建表语句中
        String[] fields = sinkColumns.split(",");
        for (int i = 0; i < fields.length; i++) {
            //判断当前字段是否是主键
            String field = fields[i];
            if(sinkPk.equals(fields)){
                //如果是主键,就拼接成这样的情况
                createTableSQL.append(field).append(" varchar primary key");

            }else{
                //如果不是主键,就拼接成这样的情况
                createTableSQL.append(field).append(" varchar");
            }

            //如果不是最后一个字段,要加上一个逗号
            if(i<fields.length-1){
                createTableSQL.append(",");
            }
        }
        //添加建表语句后的扩展字段
        createTableSQL.append(")").append(sinkExtend);

        //预编译SQL
        PreparedStatement preparedStatement = connection.prepareStatement(createTableSQL.toString());
        //执行
        preparedStatement.execute();
        //关闭资源
        preparedStatement.close();
    }

4.6 处理主流数据

主流中的数据需要干这么几件事:
1)获取到对应的主流里的数据,然后从广播状态中根据key获取相应的配置信息
2)根据拿到的配置信息去过滤字段,配置信息中有一项列字段集合,就代表是我们需要的字段
3)将处理完的数据进行分流,根据输出类型判断是写入到Kafka中还是HBase中。
实现如下:

    //对于主流中的每个数据,都可以拿到广播状态中的所有数据
    //这才是广播状态的真正意义
    //1、每来一条数据,从状态中找到对应的table_process
    //2、过滤数据,某些字段是不需要的
    //3、分流
    @Override
    public void processElement(JSONObject jsonObject, BroadcastProcessFunction<JSONObject, String, JSONObject>.ReadOnlyContext readOnlyContext, Collector<JSONObject> collector) throws Exception {
        //1、获取当前数据对应的状态
        ReadOnlyBroadcastState<String, TableProcess> broadcastState = readOnlyContext.getBroadcastState(mapStateDescriptor);
        String key=jsonObject.getString("tableName")+"-"+jsonObject.getString("operation");
        TableProcess tableProcess = broadcastState.get(key);
        //判断当前的数据能否找到对应的配置信息
        if(tableProcess!=null){
            //2、如果能找到,就根据里面需要的列,过滤掉一部分字段
            JSONObject after = jsonObject.getJSONObject("after");
            filterColumn(after,tableProcess.getSinkColumns());

            //3、分流
            //为了区分每张表放到哪个HBase中的表或者Kafka的哪个主题,需要将信息放进去
            String sinkType = tableProcess.getSinkType();
            jsonObject.put("sinkTable",tableProcess.getSinkTable());
            if(TableProcess.SINK_TYPE_KAFKA.equals(sinkType)){
                //Kafka数据,放入主流
                collector.collect(jsonObject);
            }else if(TableProcess.SINK_TYPE_HBASE.equals(sinkType)){
                //如果输出类型为HBase,就放到测输出流
                readOnlyContext.output(outputTag,jsonObject);
            }
        }else{
            System.out.println("该组合Key:"+key+"不存在!");
        }
    }

相应的过滤字段的函数:

    //after:{"id":"11","tm_name":"atguigu","logo_url":"aaa"}
    //sinkColumns: id,tm_name
    //过滤后:{"id":"11","tm_name":"atguigu"}
    private void filterColumn(JSONObject after, String sinkColumns) {
        //获取所需要的字段的名称
        String[] fields = sinkColumns.split(",");
        List<String> columns = Arrays.asList(fields);

        //遍历after的所有字段判断是否需要,不需要则直接删除
        Iterator<Map.Entry<String, Object>> iterator = after.entrySet().iterator();
        while(iterator.hasNext()){
            Map.Entry<String, Object> next = iterator.next();
            if(!columns.contains(next.getKey())){
                iterator.remove();
            }
        }
    }

到这里,对主流和广播流的处理就完成了。

4.7 数据写入HBase

HBase的数据已经写入到了测输出流,可以获取测输出流,然后将这些数据依次写入到HBase中对应的表中。
将数据写入到HBase表中要使用sink,但是没有提供HBase和Phoenix的sink方法,而JDBC Sink中要求提前写好插入数据的个数,即占位符,所以也不适合。因此,只能自定义sink类,又因为向HBase中写入数据需要连接数据库,所以最好集成RichSinkFunction函数,在周期方法里连接一次,可以节省资源。
实现代码如下:

    //7、提取Kafka流数据和HBase流数据
    DataStream<JSONObject> hbase = kafka.getSideOutput(hbaseTag);

    //8、将Kafka数据写入Kafka主题,将HBase数据写入Phoenix表
    hbase.addSink(new DimSinkFunction());

自定义Sink类插入数据到HBase:

	public class DimSinkFunction extends RichSinkFunction<JSONObject> {
	
	    private Connection connection;
	
	    @Override
	    public void open(Configuration parameters) throws Exception {
	        Class.forName(GmallConfig.PHOENIX_DRIVER);
	        connection= DriverManager.getConnection(GmallConfig.PHOENIX_SERVER);
	
	    }
	
	    @Override
	    public void close() throws Exception {
	        super.close();
	    }
	
	    //value:{"sinkTable":"","after":"","before":"","operation":"","database":"","tableName":""}
	    //SQL:"upsert into db.tn(id,tm_name) values(...,...)
	    @Override
	    public void invoke(JSONObject value, Context context) throws Exception {
	        //1、编写插入语句
	        StringBuffer insertSQL = new StringBuffer();
	        insertSQL.append("insert into ")
	                .append(GmallConfig.HBASE_SCHEMA)
	                .append(".")
	                .append(value.getString("sinkTable"))
	                .append("('");
	        JSONObject after = value.getJSONObject("after");
	        Set<String> keySet = after.keySet();
	        Collection<Object> values = after.values();
	        //将set类型的key用逗号拼接起来
	        String columns = StringUtils.join(keySet, "','");
	        insertSQL.append(columns)
	                .append("') values ('")
	                .append(StringUtils.join(values, "','"))
	                .append("')");
	
	        //2、执行插入语句
	        PreparedStatement preparedStatement = connection.prepareStatement(insertSQL.toString());
	        preparedStatement.executeUpdate();
	        preparedStatement.close();
	    }
	}

需要注意的是:
要注意处理的数据的格式;
拼接插入语句的时候,值的两边要加上单引号;

4.8 数据写入Kafka中

要写往Kafka中的数据全在主流中,我们需要将主流的数据写入Kafka中,但是之前调用获取KafkaSink的那个方法明显不适用了,因为没办法提前知道该条数据要放到哪个主题,所以没有办法传递参数,只有两种方法:一是自定义Sink;二是使用别的KafkaProducer方法,这个方法可以在参数中指定该条数据的主题。
使用KafkaProducer方法实现如下:

    kafka.addSink(MyKafkaUtil.getKafkaProducer(new KafkaSerializationSchema<JSONObject>() {
        @Override
        public ProducerRecord<byte[], byte[]> serialize(JSONObject element, @Nullable Long timestamp) {
            return new ProducerRecord<byte[], byte[]>(element.getString("sinkTable"),element.getString("after").getBytes());
        }
    }));

KafkaProducer如下:

    public static <T> FlinkKafkaProducer<T> getKafkaProducer(KafkaSerializationSchema<T> flinkKafkaProducer){
        Properties singleProp=new Properties();
        singleProp.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_SERVE);

        return new FlinkKafkaProducer<T>(default_topic, flinkKafkaProducer,singleProp, FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
    }

FlinkKafkaProducer第一个参数是默认的主题,如果在参数中没有指定主题,那么就是用这个默认的主题;
第二个参数是Kafka的序列化方法,实际上,就是在这个地方指定Kafka的主题的。
第三个参数是Kafka的配置文件。
第四个参数是指定Kafka的一致性级别。
KafkaSerializationSchema中要实现一个方法,就是ProducerRecord这个方法,这个方法的一个实现方法如下,可以直接获取并设置数据流中的每条数据写入Kafka的主题,它要求Key和value都为byte[]类型,这里key为null,所以不用设置。value在传递时要转化为byte类型:
在这里插入图片描述
至此,ODS层->DWD层所有的环节都完成了。

举报

相关推荐

0 条评论