0
点赞
收藏
分享

微信扫一扫

1、通过亿级数据量在hive和impala中查询比较text、orc和parquet性能表现(一)



文章目录

  • 一、结论
  • 二、三种文件简单介绍
  • 1、parquet
  • 2、text
  • 3、orc
  • 三、需求
  • 四、实现步骤
  • 五、实现
  • 1、创建topic
  • 2、写入kafka topic t_kafka_flink_user 1亿条数据
  • 1)、pom.xml
  • 2)、代码
  • 1、bean
  • 2、生产者
  • 3)、验证
  • 3、将kafka topic t_kafka_flink_user中的数据经过flink转换存入mysql
  • 1)、pom.xml
  • 2)、代码
  • 1、FlinkUserSinkMySQL
  • 2、主程序
  • 3)、验证
  • 4、将mysql中的数据以ORC文件格式以snapy压缩格式写入hdfs集群
  • 1)、pom.xml
  • 2)、代码
  • 1、bean
  • 2、主程序,读mysql数据库,并以orc文件格式snappy压缩方式写入hdfs集群
  • 3)、验证
  • 5、将mysql中的数据以Parquet文件格式以snapy压缩格式写入hdfs集群
  • 1)、pom.xml
  • 2)、代码
  • 1、bean
  • 2、主程序,读取mysql中的数据以parquet文件格式以snappy压缩方式写入HDFS集群
  • 3、验证写入hdfs的文件是否能通过parquet文件格式进行读取
  • 3)、验证
  • 6、在hive中创建表t_kafkauser_orc,并加载数据
  • 1)、连接hive
  • 2)、创建表
  • 3)、sqoop導入数据
  • 4)、验证
  • 7、在hive中创建表t_kafkauser_parquet,并加载数据
  • 1)、连接hive
  • 2)、创建表
  • 3)、sqoop導入数据
  • 4)、验证
  • 8、在impala中刷新hive表


本文通过在hdfs中三种不同数据格式文件存储相同数量的数据,通过hive和impala两种客户端查询进行比较。
本文前提:熟悉hadoop、hive和impala、kafka、flink等,并且其环境都可正常使用。(在后续的专栏中都会将对应的内容补全,目前已经完成了zookeeper和hadoop的部分。)
本文分为五个部分,即结论、三种文件介绍、需求、实现步骤、实现和网上别人的结论与验证。

由于本文太长,导致阅读可能比较麻烦,故一篇文章分为两篇,第一篇是准备数据,第二篇是查询比较。本文是第一篇。

本文接1、通过亿级数据量在hive和impala中查询比较text、orc和parquet性能表现(二),比较的具体数据在第二篇。

一、结论

  • 在实际生产中具体使用哪种文件存储格式须与实际的应用有关,各有优劣。
  • 僅從查詢速度上考慮,如果是hive中使用首選orc文件格式,如果是impala中使用首選parquet文件格式。
  • 如果綜合查詢速度與存儲大小上考慮,如果是hive中使用首選是orc文件格式,如果是impala中使用首選orc文件格式(文件大小相差近7倍,但查詢速度僅相差4倍)。
  • 一般而言,綜合系統應用而言,選擇parquet文件格式是常見的選擇。但還需要考慮文件的相關特性,比如parquet不支持索引頁,主要與impala配合使用。

二、三种文件简单介绍

1、parquet

  • 优势:具有高效压缩和编码,是使用时有更少的IO取出所需数据,速度比ORC快;其他方面类似于ORC;
  • 劣势:不支持update;不支持ACID;不支持可视化展示数据
  • 适用场景:Parquet 主要使用场景在Impala和Hive共享数据和元数据的场景。

2、text

  • 优势:可使用任意的分割符进行分割;在hdfs上可查可标记;加载速度较快;
  • 劣势:不会对数据进行压缩处理,存储空间较大、磁盘开销大、数据解析开销大
  • 适用场景:TEXTFILE主要使用场景在数据源层 ODS层,针对需要使用脚本load加载数据到Hive数仓表中的情况。

3、orc

  • 优势:具有很高的压缩比,且可切分;由于压缩比高,在查询时输入的数据量小,使用的task减少,所以提升了数据查询速度和处理性能;每个task只输出单个文件,减少了namenode的负载压力;在ORC文件中会对每一个字段建立一个轻量级的索引,如:row group index、bloom filter index等,可以用于where条件过滤;可使用load命令加载,但加载后select * from xx;无法读取数据;查询速度比rcfile快;支持复杂的数据类型;
  • 劣势:无法可视化展示数据;读写时需要消耗额外的CPU资源用于压缩和解压缩,但消耗较少;对schema演化支持较差;
  • 适用场景:orc 格式文件存储的出现是为了提升处理速度和减小文件占用磁盘大小。

一般建议,下面通过实际操作进行比较

  • 需要查看到所存储的具体数据内容的小型查询,可以采用默认文件格式textfile。
  • 不需要查看具体数据的小型查询时可使用sequencefile文件格式。
  • 当用于大数据量的查询时,可以使用rcfile、ORC、parquet,一般情况下推荐使用ORC,若字段数较多,不涉及到更新且取部分列查询场景多的情况下建议使用parquet。

三、需求

实现1亿条数据在hive和impala中不同sql的分析,比较text、orc、parquet三种文件存储格式的查询性能与存储大小。

四、实现步骤

下面步骤中的将mysql通过MR写入HDFS中,对于本示例实现没有作用(可跳过),仅是验证数据写入HDFS的耗时与大小。
1、创建 topic t_kafka_flink_user
2、通过程序将信息写入kafka中
3、通过Flink处理kafka数据,然后写入mysql中
4、将mysql数据通过sqoop写入hive中
5、将text、orc和parquet表中的数据在hive和impala进行查询比较、分析

五、实现

此处是按照实现步骤的顺序来编写的,也可以跳到自己有需要的地方。

1、创建topic

kafka-topics.sh --create --bootstrap-server server1:9092 --topic t_kafka_flink_user --partitions 1 --replication-factor 2

#该topic只创建了一个分区,即由单个消费者进行消费,如果需要多个分区的话,由于sink到mysql中在重复消费的异常,需要通过两阶段提交方式控制。本示例仅仅是产生数据,两阶段提交不是本文的重点,故不赘述。

2、写入kafka topic t_kafka_flink_user 1亿条数据

1)、pom.xml

<properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <flink.version>1.13.6</flink.version>
        <scala.binary.version>2.11</scala.binary.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.34</version>
        </dependency>
        <dependency>
            <groupId>joda-time</groupId>
            <artifactId>joda-time</artifactId>
            <version>2.9.9</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.28</version>
            <scope>compile</scope>
        </dependency>
        <!--state backend -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-runtime-web_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba.ververica</groupId>
            <artifactId>flink-connector-mysql-cdc</artifactId>
            <version>1.4.0</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.springframework/spring-core -->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-core</artifactId>
            <version>2.5.6</version>
        </dependency>

    </dependencies>
</project>

2)、代码

1、bean

package org.kafkasource.flink.mysql.demo1.bean;

import java.sql.Timestamp;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@AllArgsConstructor
@NoArgsConstructor
public class User {
    private int userId;
    private String url;
    private String name;
    private Timestamp createTime;
}

2、生产者

package org.kafkasource.flink.mysql.demo1.source;

import java.sql.Timestamp;
import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.kafkasource.flink.mysql.demo1.bean.User;
import org.springframework.util.StopWatch;

import com.alibaba.fastjson.JSON;

/**
 * 写入kafka消息队列
 * 
 * @author alanchan
 *
 */
public class KafkaProducerUser {
    private static final String broker_list = "server1:9092,server2:9092,server3:9092";
    private static final String topic = "t_kafka_flink_user";

    public static void writeToKafka() throws Exception {
        Properties props = new Properties();
        props.put("bootstrap.servers", broker_list);
        props.put("acks", "all");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        KafkaProducer producer = new KafkaProducer<String, String>(props);

        for (int i = 0; i < 10000 * 10000; i++) {

            User user = new User(i, "https://www.win.com/" + i, "alan" + i, new Timestamp(System.currentTimeMillis()));

            ProducerRecord record = new ProducerRecord<String, String>(topic, null, null, JSON.toJSONString(user));

            producer.send(record);

            if (i % 100 == 0) {
                producer.flush();
            }

        }

        producer.close();
    }

    public static void main(String[] args) throws Exception {
        StopWatch clock = new StopWatch();
        clock.start(KafkaProducerUser.class.getSimpleName());

        writeToKafka();

        clock.stop();
        System.out.println(clock.prettyPrint());
    }
}

3)、验证

1、通过亿级数据量在hive和impala中查询比较text、orc和parquet性能表现(一)_hive

3、将kafka topic t_kafka_flink_user中的数据经过flink转换存入mysql

1)、pom.xml

同上

2)、代码

1、FlinkUserSinkMySQL

package org.kafkasource.flink.mysql.demo1.sink;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.kafkasource.flink.mysql.demo1.bean.User;

/**
 * flink sink 至 mysql
 * 
 * @author alanchan
 *
 */
public class FlinkUserSinkMySQL extends RichSinkFunction<User> {

    private PreparedStatement ps;
    private Connection connection;

    // open() 方法中建立连接,这样不用每次 invoke 的时候都要建立连接和释放连接
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        Class.forName("com.mysql.jdbc.Driver");
        connection = DriverManager.getConnection("jdbc:mysql://192.168.10.44:3306/test?useUnicode=true&characterEncoding=UTF-8", "root", "rootroot");
        String sql = "insert into t_kafka_flink_user ( userid, url, name,create_time) values (?, ?, ?,?);";
        ps = this.connection.prepareStatement(sql);
    }

    // 每条数据的插入都要调用一次 invoke() 方法
    @Override
    public void invoke(User value, Context context) throws Exception {
        // 组装数据,执行插入操作
        ps.setInt(1, value.getUserId());
        ps.setString(2, value.getUrl());
        ps.setString(3, value.getName());
        ps.setTimestamp(4, value.getCreateTime());

        ps.executeUpdate();
    }

    @Override
    public void close() throws Exception {
        super.close();
        if (connection != null) {
            connection.close();
        }
        if (ps != null) {
            ps.close();
        }
    }

}

该示例需要先在mysql中创建好表,此处省略。

2、主程序

package org.kafkasource.flink.mysql.demo1.main;

import java.util.Properties;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema;
import org.kafkasource.flink.mysql.demo1.bean.User;
import org.kafkasource.flink.mysql.demo1.sink.FlinkUserSinkMySQL;
import org.kafkasource.flink.mysql.demo1.source.KafkaProducerUser;
import org.springframework.util.StopWatch;

import com.alibaba.fastjson.JSON;

/**
 * alanchan
 *
 */
public class App {
    private static final String topic = "t_kafka_flink_user";
    private static final String broker_list = "server1:9092,server2:9092,server3:9092";

    public static void main(String[] args) throws Exception {

        StopWatch clock = new StopWatch();
        clock.start(App.class.getSimpleName());
        
        kafkaSinkMysql();
        
        clock.stop();
        System.out.println(clock.prettyPrint());
    }

    private static void kafkaSinkMysql() throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        Properties props = new Properties();
        props.put("bootstrap.servers", broker_list);
        props.setProperty("enable.auto.commit", "true");
        props.put("group.id", topic);
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.setProperty("auto.offset.reset", "latest");
        FlinkKafkaConsumer<String> fkc = new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(), props);

        // MapFunction<T, O>
        SingleOutputStreamOperator<User> user = env.addSource(fkc).setParallelism(3).map(new MapFunction<String, User>() {

            @Override
            public User map(String value) throws Exception {
                User u = JSON.parseObject(value, User.class);
                u.setName(u.getName() + "_t");
                return u;
            }
        });

//        SingleOutputStreamOperator<User> user = env.addSource(new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(), props)).setParallelism(5)
//                .map(string -> JSON.parseObject(string, User.class));

        user.addSink(new FlinkUserSinkMySQL()); // 数据 sink 到 mysql
        env.execute("Flink Job User Kafka to MySQL");
    }
}

3)、验证

通过查看kafka消息队列是否在未消费或者消费数量是否与生产的数量一致。
通过插入mysql的数据总条数与kafka中生产的数量是否一致。

[alanchan@server1 ~]$ kafka-consumer-groups.sh --bootstrap-server server1:9092 --describe --group t_kafka_flink_user

Consumer group 't_kafka_flink_user' has no active members.

GROUP              TOPIC              PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
t_kafka_flink_user t_kafka_flink_user 0          100000000       100000000       0               -               -               -

mysql> select count(*) from t_kafka_flink_user;
+-----------+
| count(*)  |
+-----------+
| 100000000 |
+-----------+
1 row in set (24.88 sec)

4、将mysql中的数据以ORC文件格式以snapy压缩格式写入hdfs集群

1)、pom.xml

<properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <parquet.version>1.10.1</parquet.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>3.1.4</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>3.1.4</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>3.1.4</version>
        </dependency>
        <dependency>
            <groupId>jdk.tools</groupId>
            <artifactId>jdk.tools</artifactId>
            <version>1.8</version>
            <scope>system</scope>
            <systemPath>${JAVA_HOME}/lib/tools.jar</systemPath>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>3.1.4</version>
        </dependency>
        <!-- Google Options -->
        <dependency>
            <groupId>com.github.pcj</groupId>
            <artifactId>google-options</artifactId>
            <version>1.0.0</version>
        </dependency>
        <dependency>
            <groupId>commons-io</groupId>
            <artifactId>commons-io</artifactId>
            <version>2.6</version>
        </dependency>


        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>3.1.4</version>
        </dependency>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.46</version>
        </dependency>

        <!-- ORC文件依赖 -->
        <dependency>
            <groupId>org.apache.orc</groupId>
            <artifactId>orc-shims</artifactId>
            <version>1.6.3</version>
        </dependency>
        <dependency>
            <groupId>org.apache.orc</groupId>
            <artifactId>orc-core</artifactId>
            <version>1.6.3</version>
        </dependency>
        <dependency>
            <groupId>org.apache.orc</groupId>
            <artifactId>orc-mapreduce</artifactId>
            <version>1.6.3</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.springframework/spring-core -->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-core</artifactId>
            <version>2.5.6</version>
        </dependency>

        <!-- parquet -->
        <dependency>
            <groupId>org.apache.parquet</groupId>
            <artifactId>parquet-hadoop</artifactId>
            <version>${parquet.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.parquet</groupId>
            <artifactId>parquet-column</artifactId>
            <version>${parquet.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.parquet</groupId>
            <artifactId>parquet-common</artifactId>
            <version>${parquet.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.parquet</groupId>
            <artifactId>parquet-encoding</artifactId>
            <version>${parquet.version}</version>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.1.1</version>
                <configuration>
                </configuration>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <createDependencyReducedPom>false</createDependencyReducedPom>
                            <shadedArtifactAttached>true</shadedArtifactAttached>
                            <shadedClassifierName>jar-with-dependencies</shadedClassifierName>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer
                                    implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
                                <transformer
                                    implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass></mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

2)、代码

1、bean

package org.hadoop.mr.db2;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;

import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;

import lombok.Data;

/**
 * @author alanchan
 *
 */
@Data
public class KafkaUser implements Writable, DBWritable {
    private int id;
    private int userid;
    private String name;
    private String url;
    private String create_time;

    @Override
    public void write(PreparedStatement ps) throws SQLException {
        ps.setInt(1, id);
        ps.setInt(2, userid);
        ps.setString(3, name);
        ps.setString(4, url);
        ps.setString(5, create_time);

    }

    @Override
    public void readFields(ResultSet rs) throws SQLException {
        this.id = rs.getInt(1);
        this.userid = rs.getInt(2);
        this.name = rs.getString(3);
        this.url = rs.getString(4);
        this.create_time = rs.getString(5);

    }

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeInt(id);
        out.writeInt(userid);
        out.writeUTF(name);
        out.writeUTF(url);
        out.writeUTF(create_time);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        id = in.readInt();
        userid = in.readInt();
        name = in.readUTF();
        url = in.readUTF();
        create_time = in.readUTF();
    }

    public String toString() {
        return id + "," + userid + "," + name + "," + url + "," + create_time;
    }
}

2、主程序,读mysql数据库,并以orc文件格式snappy压缩方式写入hdfs集群

package org.hadoop.mr.db2;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.orc.OrcConf;
import org.apache.orc.TypeDescription;
import org.apache.orc.mapred.OrcStruct;
import org.apache.orc.mapreduce.OrcOutputFormat;
import org.springframework.util.StopWatch;

/**
 * @author alanchan
 *
 */
public class ReadFromMysqlToOrcFileBySnappy extends Configured implements Tool {
    // id int,userid int, name string,url string, create_time string
    private static final String SCHEMA = "struct<id:int,userid:int,name:string,url:string,create_time:string>";
    static String out = "hdfs://hadoopha/test/hive/input/orc";

//hdfs://hadoopha/tmp/hive/root/input/orc
    static class ReadFromMysqlToOrcFileBySnappyMapper extends Mapper<LongWritable, KafkaUser, NullWritable, OrcStruct> {
        // 获取字段描述信息
        private TypeDescription schema = TypeDescription.fromString(SCHEMA);
        // 构建输出的Key
        private final NullWritable outKey = NullWritable.get();
        // 构建输出的Value为ORCStruct类型
        private final OrcStruct outValue = (OrcStruct) OrcStruct.createValue(schema);

        protected void map(LongWritable key, KafkaUser user, Context context) throws IOException, InterruptedException {
            Counter counter = context.getCounter("mysql_records_counters", "User Records");
            counter.increment(1);

//            String[] fields = value.toString().split(",", 6);
            // 将所有字段赋值给Value中的列
            outValue.setFieldValue(0, new IntWritable(user.getId()));
            outValue.setFieldValue(1, new IntWritable(user.getUserid()));
            outValue.setFieldValue(2, new Text(user.getName()));
            outValue.setFieldValue(3, new Text(user.getUrl()));
            outValue.setFieldValue(4, new Text(user.getCreate_time()));

            context.write(outKey, outValue);
        }
    }

    @Override
    public int run(String[] args) throws Exception {
        OrcConf.MAPRED_OUTPUT_SCHEMA.setString(this.getConf(), SCHEMA);
        Configuration conf = getConf();

        DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver", "jdbc:mysql://192.168.10.44:3306/test", "root", "rootroot");

        Job job = Job.getInstance(conf, this.getClass().getSimpleName());
        job.setJarByClass(this.getClass());

        DBInputFormat.setInput(job, KafkaUser.class, "select id, userid,name,url,create_time  from t_kafka_flink_user ", "select count(*) from  t_kafka_flink_user ");
        // where name = 'alan234350_t'
        job.setInputFormatClass(DBInputFormat.class);

        Path outputDir = new Path(out);
        outputDir.getFileSystem(this.getConf()).delete(outputDir, true);
        FileOutputFormat.setOutputPath(job, outputDir);

        job.setMapperClass(ReadFromMysqlToOrcFileBySnappyMapper.class);
        job.setMapOutputKeyClass(NullWritable.class);
        job.setMapOutputValueClass(OrcStruct.class);
        job.setOutputFormatClass(OrcOutputFormat.class);

        job.setNumReduceTasks(0);

        return job.waitForCompletion(true) ? 0 : 1;
    }

    public static void main(String[] args) throws Exception {
        StopWatch clock = new StopWatch();
        clock.start(ReadFromMysqlToOrcFileBySnappy.class.getSimpleName());

        Configuration conf = new Configuration();
        // 配置HDFS高可用訪問
        conf.set("fs.defaultFS", "hdfs://hadoopha");
        conf.set("dfs.nameservices", "hadoopha");
//        /namenode29,namenode47
        conf.set("dfs.ha.namenodes.hadoopha", "namenode29,namenode47");
        conf.set("dfs.namenode.rpc-address.hadoopha.namenode29", "server6:8020");
        conf.set("dfs.namenode.rpc-address.hadoopha.namenode47", "server7:8020");
        conf.set("dfs.client.failover.proxy.provider.hadoopha", "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");

        conf.set("mapreduce.map.output.compress", "true");
        conf.set("mapreduce.map.output.compress.codec", "org.apache.hadoop.io.compress.SnappyCodec");

        int status = ToolRunner.run(conf, new ReadFromMysqlToOrcFileBySnappy(), args);
        System.exit(status);

        clock.stop();
        System.out.println(clock.prettyPrint());
    }

}

3)、验证

查看主程序运行结果(MR的运行日志,主要读取条数与写入条数)。(4分钟完成写入到hdfs,10.78G压缩后823M)
查看hdfs中文件是否存在及大小。
查看写入hdfs的文件是否能通过orc文件格式进行读取,此处略。

2023-01-19 12:00:22,861 INFO mapreduce.JobSubmitter: number of splits:1
2023-01-19 12:00:23,016 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_local711750867_0001
2023-01-19 12:00:23,018 INFO mapreduce.JobSubmitter: Executing with tokens: []
2023-01-19 12:00:23,147 INFO mapreduce.Job: The url to track the job: http://localhost:8080/
2023-01-19 12:00:23,149 INFO mapreduce.Job: Running job: job_local711750867_0001
2023-01-19 12:00:23,150 INFO mapred.LocalJobRunner: OutputCommitter set in config null
2023-01-19 12:00:23,153 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 2
2023-01-19 12:00:23,153 INFO output.FileOutputCommitter: FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: false
2023-01-19 12:00:23,153 INFO mapred.LocalJobRunner: OutputCommitter is org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
2023-01-19 12:00:23,249 INFO mapred.LocalJobRunner: Waiting for map tasks
2023-01-19 12:00:23,251 INFO mapred.LocalJobRunner: Starting task: attempt_local711750867_0001_m_000000_0
2023-01-19 12:00:23,268 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 2
2023-01-19 12:00:23,271 INFO output.FileOutputCommitter: FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: false
2023-01-19 12:00:23,281 INFO util.ProcfsBasedProcessTree: ProcfsBasedProcessTree currently is supported only on Linux.
2023-01-19 12:00:23,401 INFO mapred.Task:  Using ResourceCalculatorProcessTree : org.apache.hadoop.yarn.util.WindowsBasedProcessTree@7fa1434b
Thu Jan 19 12:00:23 CST 2023 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.
2023-01-19 12:00:23,420 INFO mapred.MapTask: Processing split: org.apache.hadoop.mapreduce.lib.db.DBInputFormat$DBInputSplit@31c2b3f6
Thu Jan 19 12:00:23 CST 2023 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.
2023-01-19 12:00:23,481 INFO impl.HadoopShimsPre2_7: Can't get KeyProvider for ORC encryption from hadoop.security.key.provider.path.
2023-01-19 12:00:23,492 INFO impl.OrcCodecPool: Got brand-new codec ZLIB
2023-01-19 12:00:23,493 INFO impl.PhysicalFsWriter: ORC writer created for path: hdfs://hadoopha/test/hive/input/orc/_temporary/0/_temporary/attempt_local711750867_0001_m_000000_0/part-m-00000.orc with stripeSize: 67108864 blockSize: 268435456 compression: Compress: ZLIB buffer: 262144
2023-01-19 12:00:23,621 INFO impl.WriterImpl: ORC writer created for path: hdfs://hadoopha/test/hive/input/orc/_temporary/0/_temporary/attempt_local711750867_0001_m_000000_0/part-m-00000.orc with stripeSize: 67108864 options: Compress: ZLIB buffer: 262144
2023-01-19 12:00:24,161 INFO mapreduce.Job: Job job_local711750867_0001 running in uber mode : false
2023-01-19 12:00:24,162 INFO mapreduce.Job:  map 0% reduce 0%
2023-01-19 12:00:35,283 INFO mapred.LocalJobRunner: map > map
2023-01-19 12:00:36,277 INFO mapreduce.Job:  map 5% reduce 0%
2023-01-19 12:00:41,287 INFO mapred.LocalJobRunner: map > map
2023-01-19 12:00:41,334 INFO mapreduce.Job:  map 7% reduce 0%
2023-01-19 12:00:47,288 INFO mapred.LocalJobRunner: map > map
2023-01-19 12:00:47,365 INFO mapreduce.Job:  map 10% reduce 0%
2023-01-19 12:00:53,292 INFO mapred.LocalJobRunner: map > map
2023-01-19 12:00:53,429 INFO mapreduce.Job:  map 12% reduce 0%
2023-01-19 12:00:59,298 INFO mapred.LocalJobRunner: map > map
2023-01-19 12:00:59,496 INFO mapreduce.Job:  map 14% reduce 0%
2023-01-19 12:01:05,306 INFO mapred.LocalJobRunner: map > map
2023-01-19 12:01:05,536 INFO mapreduce.Job:  map 16% reduce 0%
2023-01-19 12:01:11,320 INFO mapred.LocalJobRunner: map > map
2023-01-19 12:01:11,582 INFO mapreduce.Job:  map 19% reduce 0%
2023-01-19 12:01:17,321 INFO mapred.LocalJobRunner: map > map
2023-01-19 12:01:17,640 INFO mapreduce.Job:  map 21% reduce 0%
2023-01-19 12:01:23,334 INFO mapred.LocalJobRunner: map > map
2023-01-19 12:01:23,704 INFO mapreduce.Job:  map 24% reduce 0%
2023-01-19 12:01:29,344 INFO mapred.LocalJobRunner: map > map
2023-01-19 12:01:29,746 INFO mapreduce.Job:  map 26% reduce 0%
2023-01-19 12:01:35,355 INFO mapred.LocalJobRunner: map > map
2023-01-19 12:01:35,812 INFO mapreduce.Job:  map 29% reduce 0%
2023-01-19 12:01:41,359 INFO mapred.LocalJobRunner: map > map
2023-01-19 12:01:41,876 INFO mapreduce.Job:  map 31% reduce 0%
2023-01-19 12:01:47,361 INFO mapred.LocalJobRunner: map > map
2023-01-19 12:01:47,914 INFO mapreduce.Job:  map 34% reduce 0%
2023-01-19 12:01:53,362 INFO mapred.LocalJobRunner: map > map
2023-01-19 12:01:53,951 INFO mapreduce.Job:  map 36% reduce 0%
2023-01-19 12:01:59,363 INFO mapred.LocalJobRunner: map > map
2023-01-19 12:01:59,969 INFO mapreduce.Job:  map 39% reduce 0%
2023-01-19 12:02:05,366 INFO mapred.LocalJobRunner: map > map
2023-01-19 12:02:06,039 INFO mapreduce.Job:  map 41% reduce 0%
2023-01-19 12:02:11,370 INFO mapred.LocalJobRunner: map > map
2023-01-19 12:02:12,077 INFO mapreduce.Job:  map 44% reduce 0%
2023-01-19 12:02:17,377 INFO mapred.LocalJobRunner: map > map
2023-01-19 12:02:18,114 INFO mapreduce.Job:  map 47% reduce 0%
2023-01-19 12:02:23,381 INFO mapred.LocalJobRunner: map > map
2023-01-19 12:02:24,163 INFO mapreduce.Job:  map 50% reduce 0%
2023-01-19 12:02:29,386 INFO mapred.LocalJobRunner: map > map
2023-01-19 12:02:30,216 INFO mapreduce.Job:  map 53% reduce 0%
2023-01-19 12:02:35,395 INFO mapred.LocalJobRunner: map > map
2023-01-19 12:02:36,252 INFO mapreduce.Job:  map 56% reduce 0%
2023-01-19 12:02:41,397 INFO mapred.LocalJobRunner: map > map
2023-01-19 12:02:42,302 INFO mapreduce.Job:  map 59% reduce 0%
2023-01-19 12:02:47,407 INFO mapred.LocalJobRunner: map > map
2023-01-19 12:02:48,339 INFO mapreduce.Job:  map 62% reduce 0%
2023-01-19 12:02:53,420 INFO mapred.LocalJobRunner: map > map
2023-01-19 12:02:54,366 INFO mapreduce.Job:  map 65% reduce 0%
2023-01-19 12:02:59,427 INFO mapred.LocalJobRunner: map > map
2023-01-19 12:03:00,401 INFO mapreduce.Job:  map 68% reduce 0%
2023-01-19 12:03:05,432 INFO mapred.LocalJobRunner: map > map
2023-01-19 12:03:05,466 INFO mapreduce.Job:  map 71% reduce 0%
2023-01-19 12:03:11,443 INFO mapred.LocalJobRunner: map > map
2023-01-19 12:03:11,501 INFO mapreduce.Job:  map 74% reduce 0%
2023-01-19 12:03:17,461 INFO mapred.LocalJobRunner: map > map
2023-01-19 12:03:17,526 INFO mapreduce.Job:  map 76% reduce 0%
2023-01-19 12:03:23,467 INFO mapred.LocalJobRunner: map > map
2023-01-19 12:03:23,562 INFO mapreduce.Job:  map 80% reduce 0%
2023-01-19 12:03:29,467 INFO mapred.LocalJobRunner: map > map
2023-01-19 12:03:29,594 INFO mapreduce.Job:  map 82% reduce 0%
2023-01-19 12:03:35,476 INFO mapred.LocalJobRunner: map > map
2023-01-19 12:03:35,640 INFO mapreduce.Job:  map 85% reduce 0%
2023-01-19 12:03:41,499 INFO mapred.LocalJobRunner: map > map
2023-01-19 12:03:41,693 INFO mapreduce.Job:  map 88% reduce 0%
2023-01-19 12:03:47,507 INFO mapred.LocalJobRunner: map > map
2023-01-19 12:03:47,735 INFO mapreduce.Job:  map 91% reduce 0%
2023-01-19 12:03:53,507 INFO mapred.LocalJobRunner: map > map
2023-01-19 12:03:53,774 INFO mapreduce.Job:  map 94% reduce 0%
2023-01-19 12:03:59,514 INFO mapred.LocalJobRunner: map > map
2023-01-19 12:03:59,827 INFO mapreduce.Job:  map 97% reduce 0%
2023-01-19 12:04:05,074 INFO mapred.LocalJobRunner: map > map
2023-01-19 12:04:05,517 INFO mapred.LocalJobRunner: map
2023-01-19 12:04:05,868 INFO mapreduce.Job:  map 100% reduce 0%
2023-01-19 12:04:06,295 INFO mapred.Task: Task:attempt_local711750867_0001_m_000000_0 is done. And is in the process of committing
2023-01-19 12:04:06,297 INFO mapred.LocalJobRunner: map
2023-01-19 12:04:06,297 INFO mapred.Task: Task attempt_local711750867_0001_m_000000_0 is allowed to commit now
2023-01-19 12:04:06,312 INFO output.FileOutputCommitter: Saved output of task 'attempt_local711750867_0001_m_000000_0' to hdfs://hadoopha/test/hive/input/orc
2023-01-19 12:04:06,313 INFO mapred.LocalJobRunner: map
2023-01-19 12:04:06,313 INFO mapred.Task: Task 'attempt_local711750867_0001_m_000000_0' done.
2023-01-19 12:04:06,317 INFO mapred.Task: Final Counters for attempt_local711750867_0001_m_000000_0: Counters: 21
    File System Counters
        FILE: Number of bytes read=126
        FILE: Number of bytes written=515068
        FILE: Number of read operations=0
        FILE: Number of large read operations=0
        FILE: Number of write operations=0
        HDFS: Number of bytes read=0
        HDFS: Number of bytes written=863288011
        HDFS: Number of read operations=6
        HDFS: Number of large read operations=0
        HDFS: Number of write operations=4
    Map-Reduce Framework
        Map input records=100000000
        Map output records=100000000
        Input split bytes=78
        Spilled Records=0
        Failed Shuffles=0
        Merged Map outputs=0
        GC time elapsed (ms)=720
        Total committed heap usage (bytes)=8488747008
    mysql_records_counters
        User Records=100000000
    File Input Format Counters 
        Bytes Read=0
    File Output Format Counters 
        Bytes Written=863288011
2023-01-19 12:04:06,317 INFO mapred.LocalJobRunner: Finishing task: attempt_local711750867_0001_m_000000_0
2023-01-19 12:04:06,317 INFO mapred.LocalJobRunner: map task executor complete.
2023-01-19 12:04:06,881 INFO mapreduce.Job: Job job_local711750867_0001 completed successfully
2023-01-19 12:04:06,892 INFO mapreduce.Job: Counters: 21
    File System Counters
        FILE: Number of bytes read=126
        FILE: Number of bytes written=515068
        FILE: Number of read operations=0
        FILE: Number of large read operations=0
        FILE: Number of write operations=0
        HDFS: Number of bytes read=0
        HDFS: Number of bytes written=863288011
        HDFS: Number of read operations=6
        HDFS: Number of large read operations=0
        HDFS: Number of write operations=4
    Map-Reduce Framework
        Map input records=100000000
        Map output records=100000000
        Input split bytes=78
        Spilled Records=0
        Failed Shuffles=0
        Merged Map outputs=0
        GC time elapsed (ms)=720
        Total committed heap usage (bytes)=8488747008
    mysql_records_counters
        User Records=100000000
    File Input Format Counters 
        Bytes Read=0
    File Output Format Counters 
        Bytes Written=863288011

1、通过亿级数据量在hive和impala中查询比较text、orc和parquet性能表现(一)_hive_02

5、将mysql中的数据以Parquet文件格式以snapy压缩格式写入hdfs集群

1)、pom.xml

同上

2)、代码

1、bean

同上

2、主程序,读取mysql中的数据以parquet文件格式以snappy压缩方式写入HDFS集群

package org.hadoop.mr.db2;

import java.io.IOException;
import java.util.Random;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.orc.OrcConf;
import org.apache.orc.TypeDescription;
import org.apache.orc.mapred.OrcStruct;
import org.apache.orc.mapreduce.OrcOutputFormat;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.example.data.simple.SimpleGroupFactory;
import org.apache.parquet.hadoop.ParquetFileWriter;
import org.apache.parquet.hadoop.ParquetOutputFormat;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.example.ExampleParquetWriter;
import org.apache.parquet.hadoop.example.GroupWriteSupport;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.MessageTypeParser;
import org.apache.parquet.schema.OriginalType;
import org.apache.parquet.schema.Types;
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
import org.springframework.util.StopWatch;

/**
 * @author alanchan
 *
 */
public class ReadFromMysqlToParquetFileBySnappy extends Configured implements Tool {
    static String out = "hdfs://hadoopha/test/hive/input//parquet";

    public static void main(String[] args) throws Exception {
        StopWatch clock = new StopWatch();
        clock.start(ReadFromMysqlToParquetFileBySnappy.class.getSimpleName());

        Configuration conf = new Configuration();
        // 配置HDFS高可用訪問
        conf.set("fs.defaultFS", "hdfs://hadoopha");
        conf.set("dfs.nameservices", "hadoopha");
//       namenode29,namenode47
        conf.set("dfs.ha.namenodes.hadoopha", "namenode29,namenode47");
        conf.set("dfs.namenode.rpc-address.hadoopha.namenode29", "server6:8020");
        conf.set("dfs.namenode.rpc-address.hadoopha.namenode47", "server7:8020");
        conf.set("dfs.client.failover.proxy.provider.hadoopha", "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");

        conf.set(DBConfiguration.DRIVER_CLASS_PROPERTY, "com.mysql.jdbc.Driver");
        conf.set(DBConfiguration.URL_PROPERTY, "jdbc:mysql://192.168.10.44:3306/test");
        conf.set(DBConfiguration.USERNAME_PROPERTY, "root");
        conf.set(DBConfiguration.PASSWORD_PROPERTY, "rootroot");

//        conf.set("mapreduce.map.output.compress", "true");
//        conf.set("mapreduce.map.output.compress.codec", "org.apache.hadoop.io.compress.SnappyCodec");

        int status = ToolRunner.run(conf, new ReadFromMysqlToParquetFileBySnappy(), args);
        System.exit(status);

        clock.stop();
        System.out.println(clock.prettyPrint());
    }

    @Override
    public int run(String[] args) throws Exception {
        Configuration conf = getConf();
        // id int,userid int, name string,url string, create_time string
        MessageType schema = Types.buildMessage().required(PrimitiveTypeName.INT32).named("id").required(PrimitiveTypeName.INT32).named("userid").required(PrimitiveTypeName.BINARY)
                .as(OriginalType.UTF8).named("name").required(PrimitiveTypeName.BINARY).as(OriginalType.UTF8).named("url").required(PrimitiveTypeName.BINARY).as(OriginalType.UTF8)
                .named("create_time").named("t_kafka_flink_user");

        System.out.println("[schema]==" + schema.toString());
        GroupWriteSupport.setSchema(schema, conf);

        Job job = Job.getInstance(conf, this.getClass().getName());
        job.setJarByClass(this.getClass());
        job.setInputFormatClass(DBInputFormat.class);

//        DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver", "jdbc:mysql://192.168.10.44:3306/test", "root", "rootroot");
        DBInputFormat.setInput(job, KafkaUser.class, "select id, userid,name,url,create_time  from t_kafka_flink_user ", "select count(*) from  t_kafka_flink_user ");

        job.setMapperClass(ReadFromMysqlToParquetFileBySnappyMapper.class);
        job.setMapOutputKeyClass(NullWritable.class);
        job.setMapOutputValueClass(Group.class);
        job.setOutputFormatClass(ParquetOutputFormat.class);
        ParquetOutputFormat.setWriteSupportClass(job, GroupWriteSupport.class);
        ParquetOutputFormat.setCompression(job, CompressionCodecName.SNAPPY);

        Path outputDir = new Path(out);
        outputDir.getFileSystem(this.getConf()).delete(outputDir, true);
        FileOutputFormat.setOutputPath(job, new Path(out));

        job.setNumReduceTasks(0);
        return job.waitForCompletion(true) ? 0 : 1;
    }

    static class ReadFromMysqlToParquetFileBySnappyMapper extends Mapper<LongWritable, KafkaUser, NullWritable, Group> {
        SimpleGroupFactory factory = null;

        protected void setup(Context context) throws IOException, InterruptedException {
            factory = new SimpleGroupFactory(GroupWriteSupport.getSchema(context.getConfiguration()));
        };

        protected void map(LongWritable key, KafkaUser value, Context context) throws IOException, InterruptedException {
            Counter counter = context.getCounter("mysql_records_counters", "t_kafka_flink_user");
            counter.increment(1);

            Group user = factory.newGroup();
//            String[] strs = value.toString().split(",");
            user.append("id", value.getId());
            user.append("userid", value.getUserid());
            user.append("name", value.getName());
            user.append("url", value.getUrl());
            user.append("create_time", value.getCreate_time());

            context.write(null, user);

        }

    }

}

3、验证写入hdfs的文件是否能通过parquet文件格式进行读取

写入很少一部分数据,然后读取

package org.hadoop.mr.db2;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.hadoop.ParquetInputFormat;
import org.apache.parquet.hadoop.example.GroupReadSupport;
import org.apache.parquet.hadoop.example.GroupWriteSupport;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.OriginalType;
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
import org.apache.parquet.schema.Types;
import org.hadoop.mr.filetype.parquetfile.ReadParquetFile;
import org.springframework.util.StopWatch;

/**
 * 测试读取从mysql写入hdfs的parquet文件 
 * ReadFromMysqlToParquetFileBySnappy.java写入
 * 
 * @author alanchan
 *
 */
public class Check_ReadParquetFile extends Configured implements Tool {
    static String in = "hdfs://hadoopha/test/hive/input//parquet";
    static String out = "hdfs://hadoopha/test/hive/input//parquet_read";

    public static void main(String[] args) throws Exception {
        StopWatch clock = new StopWatch();
        clock.start(ReadParquetFile.class.getSimpleName());

        Configuration conf = new Configuration();
        // 配置HDFS高可用訪問
        conf.set("fs.defaultFS", "hdfs://hadoopha");
        conf.set("dfs.nameservices", "hadoopha");
//               namenode29,namenode47
        conf.set("dfs.ha.namenodes.hadoopha", "namenode29,namenode47");
        conf.set("dfs.namenode.rpc-address.hadoopha.namenode29", "server6:8020");
        conf.set("dfs.namenode.rpc-address.hadoopha.namenode47", "server7:8020");
        conf.set("dfs.client.failover.proxy.provider.hadoopha", "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");

        int status = ToolRunner.run(conf, new Check_ReadParquetFile(), args);
        System.exit(status);

        clock.stop();
        System.out.println(clock.prettyPrint());
    }

    @Override
    public int run(String[] args) throws Exception {
        Configuration conf = new Configuration(this.getConf());
        // id int,userid int, name string,url string, create_time string
        MessageType schema = Types.buildMessage().required(PrimitiveTypeName.INT32).named("id").required(PrimitiveTypeName.INT32).named("userid").required(PrimitiveTypeName.BINARY)
                .as(OriginalType.UTF8).named("name").required(PrimitiveTypeName.BINARY).as(OriginalType.UTF8).named("url").required(PrimitiveTypeName.BINARY).as(OriginalType.UTF8)
                .named("create_time").named("t_kafka_flink_user");

        GroupWriteSupport.setSchema(schema, conf);
        Job job = Job.getInstance(conf, this.getClass().getName());
        job.setJarByClass(this.getClass());
        // parquet输入
        job.setMapperClass(Check_ReadParquetFileMapper.class);
        job.setNumReduceTasks(0);
        job.setInputFormatClass(ParquetInputFormat.class);
        ParquetInputFormat.setReadSupportClass(job, GroupReadSupport.class);
        FileInputFormat.setInputPaths(job, in);

        job.setOutputKeyClass(NullWritable.class);
        job.setOutputValueClass(Text.class);
        Path outputDir = new Path(out);
        outputDir.getFileSystem(this.getConf()).delete(outputDir, true);
        FileOutputFormat.setOutputPath(job, new Path(out));

        job.setNumReduceTasks(0);

        return job.waitForCompletion(true) ? 0 : 1;
    }

    public static class Check_ReadParquetFileMapper extends Mapper<NullWritable, Group, NullWritable, Text> {
        protected void map(NullWritable key, Group value, Context context) throws IOException, InterruptedException {
//            String city = value.getString(0, 0);
//            String ip = value.getString(1, 0);
//            context.write(NullWritable.get(), new Text(city + "," + ip));
            context.write(NullWritable.get(),
                    new Text(value.getInteger(0, 0) + "," + value.getInteger(1, 0) + "," + value.getString(2, 0) + "," + value.getString(3, 0) + "," + value.getString(4, 0)));
        }
    }
}

3)、验证

查看主程序运行结果(MR的运行日志,主要读取条数与写入条数)。(5分钟完成写入到hdfs,10.78G压缩后4.7G)
查看hdfs中文件是否存在及大小。
查看写入hdfs的文件是否能通过parquet文件格式进行读取。

[schema]==message t_kafka_flink_user {
  required int32 id;
  required int32 userid;
  required binary name (UTF8);
  required binary url (UTF8);
  required binary create_time (UTF8);
}

2023-01-19 14:37:09,691 WARN impl.MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-jobtracker.properties,hadoop-metrics2.properties
2023-01-19 14:37:09,728 INFO impl.MetricsSystemImpl: Scheduled Metric snapshot period at 10 second(s).
2023-01-19 14:37:09,728 INFO impl.MetricsSystemImpl: JobTracker metrics system started
2023-01-19 14:37:09,995 WARN mapreduce.JobResourceUploader: No job jar file set.  User classes may not be found. See Job or Job#setJar(String).
Thu Jan 19 14:37:10 CST 2023 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.
2023-01-19 14:38:11,288 INFO mapreduce.JobSubmitter: number of splits:1
2023-01-19 14:38:11,343 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_local1727911947_0001
2023-01-19 14:38:11,344 INFO mapreduce.JobSubmitter: Executing with tokens: []
2023-01-19 14:38:11,433 INFO mapreduce.Job: The url to track the job: http://localhost:8080/
2023-01-19 14:38:11,434 INFO mapreduce.Job: Running job: job_local1727911947_0001
2023-01-19 14:38:11,435 INFO mapred.LocalJobRunner: OutputCommitter set in config null
2023-01-19 14:38:11,437 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 2
2023-01-19 14:38:11,437 INFO output.FileOutputCommitter: FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: false
2023-01-19 14:38:11,438 INFO mapred.LocalJobRunner: OutputCommitter is org.apache.parquet.hadoop.ParquetOutputCommitter
2023-01-19 14:38:11,495 INFO mapred.LocalJobRunner: Waiting for map tasks
2023-01-19 14:38:11,495 INFO mapred.LocalJobRunner: Starting task: attempt_local1727911947_0001_m_000000_0
2023-01-19 14:38:11,507 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 2
2023-01-19 14:38:11,507 INFO output.FileOutputCommitter: FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: false
2023-01-19 14:38:11,513 INFO util.ProcfsBasedProcessTree: ProcfsBasedProcessTree currently is supported only on Linux.
2023-01-19 14:38:11,647 INFO mapred.Task:  Using ResourceCalculatorProcessTree : org.apache.hadoop.yarn.util.WindowsBasedProcessTree@fcac36
Thu Jan 19 14:38:11 CST 2023 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.
2023-01-19 14:38:11,654 INFO mapred.MapTask: Processing split: org.apache.hadoop.mapreduce.lib.db.DBInputFormat$DBInputSplit@2d94b192
Thu Jan 19 14:38:11 CST 2023 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.
2023-01-19 14:38:11,665 INFO codec.CodecConfig: Compression: SNAPPY
2023-01-19 14:38:11,674 INFO hadoop.ParquetOutputFormat: Parquet block size to 134217728
2023-01-19 14:38:11,674 INFO hadoop.ParquetOutputFormat: Parquet page size to 1048576
2023-01-19 14:38:11,674 INFO hadoop.ParquetOutputFormat: Parquet dictionary page size to 1048576
2023-01-19 14:38:11,674 INFO hadoop.ParquetOutputFormat: Dictionary is on
2023-01-19 14:38:11,674 INFO hadoop.ParquetOutputFormat: Validation is off
2023-01-19 14:38:11,674 INFO hadoop.ParquetOutputFormat: Writer version is: PARQUET_1_0
2023-01-19 14:38:11,674 INFO hadoop.ParquetOutputFormat: Maximum row group padding size is 8388608 bytes
2023-01-19 14:38:11,674 INFO hadoop.ParquetOutputFormat: Page size checking is: estimated
2023-01-19 14:38:11,674 INFO hadoop.ParquetOutputFormat: Min row count for page size check is: 100
2023-01-19 14:38:11,674 INFO hadoop.ParquetOutputFormat: Max row count for page size check is: 10000
2023-01-19 14:38:11,716 INFO compress.CodecPool: Got brand-new compressor [.snappy]
2023-01-19 14:38:12,445 INFO mapreduce.Job: Job job_local1727911947_0001 running in uber mode : false
2023-01-19 14:38:12,447 INFO mapreduce.Job:  map 0% reduce 0%
2023-01-19 14:38:23,529 INFO mapred.LocalJobRunner: map > map
2023-01-19 14:38:24,519 INFO mapreduce.Job:  map 5% reduce 0%
2023-01-19 14:38:27,557 INFO hadoop.InternalParquetRecordWriter: mem size 134677286 > 134217728: flushing 7240100 records to disk.
2023-01-19 14:38:27,557 INFO hadoop.InternalParquetRecordWriter: Flushing mem columnStore to file. allocated memory: 134565867
2023-01-19 14:38:29,537 INFO mapred.LocalJobRunner: map > map
2023-01-19 14:38:30,544 INFO mapreduce.Job:  map 7% reduce 0%
2023-01-19 14:38:35,543 INFO mapred.LocalJobRunner: map > map
2023-01-19 14:38:35,593 INFO mapreduce.Job:  map 11% reduce 0%
2023-01-19 14:38:41,548 INFO mapred.LocalJobRunner: map > map
2023-01-19 14:38:41,644 INFO mapreduce.Job:  map 13% reduce 0%
2023-01-19 14:38:43,667 INFO hadoop.InternalParquetRecordWriter: mem size 134676436 > 134217728: flushing 7290100 records to disk.
2023-01-19 14:38:43,667 INFO hadoop.InternalParquetRecordWriter: Flushing mem columnStore to file. allocated memory: 134366198
2023-01-19 14:38:47,551 INFO mapred.LocalJobRunner: map > map
2023-01-19 14:38:47,679 INFO mapreduce.Job:  map 15% reduce 0%
2023-01-19 14:38:53,559 INFO mapred.LocalJobRunner: map > map
2023-01-19 14:38:53,705 INFO mapreduce.Job:  map 18% reduce 0%
2023-01-19 14:38:59,566 INFO mapred.LocalJobRunner: map > map
2023-01-19 14:38:59,735 INFO mapreduce.Job:  map 21% reduce 0%
2023-01-19 14:39:01,365 INFO hadoop.InternalParquetRecordWriter: mem size 134619747 > 134217728: flushing 7160100 records to disk.
2023-01-19 14:39:01,365 INFO hadoop.InternalParquetRecordWriter: Flushing mem columnStore to file. allocated memory: 134740152
2023-01-19 14:39:05,583 INFO mapred.LocalJobRunner: map > map
2023-01-19 14:39:05,804 INFO mapreduce.Job:  map 22% reduce 0%
2023-01-19 14:39:11,592 INFO mapred.LocalJobRunner: map > map
2023-01-19 14:39:11,850 INFO mapreduce.Job:  map 25% reduce 0%
2023-01-19 14:39:17,596 INFO mapred.LocalJobRunner: map > map
2023-01-19 14:39:17,899 INFO mapreduce.Job:  map 28% reduce 0%
2023-01-19 14:39:19,559 INFO hadoop.InternalParquetRecordWriter: mem size 134447803 > 134217728: flushing 7270100 records to disk.
2023-01-19 14:39:19,559 INFO hadoop.InternalParquetRecordWriter: Flushing mem columnStore to file. allocated memory: 134602655
2023-01-19 14:39:23,599 INFO mapred.LocalJobRunner: map > map
2023-01-19 14:39:23,949 INFO mapreduce.Job:  map 29% reduce 0%
2023-01-19 14:39:29,607 INFO mapred.LocalJobRunner: map > map
2023-01-19 14:39:30,013 INFO mapreduce.Job:  map 32% reduce 0%
2023-01-19 14:39:35,612 INFO mapred.LocalJobRunner: map > map
2023-01-19 14:39:36,052 INFO mapreduce.Job:  map 35% reduce 0%
2023-01-19 14:39:37,246 INFO hadoop.InternalParquetRecordWriter: mem size 134363246 > 134217728: flushing 7320100 records to disk.
2023-01-19 14:39:37,246 INFO hadoop.InternalParquetRecordWriter: Flushing mem columnStore to file. allocated memory: 134385006
2023-01-19 14:39:41,617 INFO mapred.LocalJobRunner: map > map
2023-01-19 14:39:42,095 INFO mapreduce.Job:  map 38% reduce 0%
2023-01-19 14:39:47,626 INFO mapred.LocalJobRunner: map > map
2023-01-19 14:39:48,155 INFO mapreduce.Job:  map 41% reduce 0%
2023-01-19 14:39:53,448 INFO hadoop.InternalParquetRecordWriter: mem size 134948348 > 134217728: flushing 7330100 records to disk.
2023-01-19 14:39:53,448 INFO hadoop.InternalParquetRecordWriter: Flushing mem columnStore to file. allocated memory: 134809203
2023-01-19 14:39:53,630 INFO mapred.LocalJobRunner: map > map
2023-01-19 14:39:54,200 INFO mapreduce.Job:  map 44% reduce 0%
2023-01-19 14:39:59,636 INFO mapred.LocalJobRunner: map > map
2023-01-19 14:40:00,228 INFO mapreduce.Job:  map 45% reduce 0%
2023-01-19 14:40:05,641 INFO mapred.LocalJobRunner: map > map
2023-01-19 14:40:06,255 INFO mapreduce.Job:  map 48% reduce 0%
2023-01-19 14:40:11,475 INFO hadoop.InternalParquetRecordWriter: mem size 134982569 > 134217728: flushing 7330100 records to disk.
2023-01-19 14:40:11,476 INFO hadoop.InternalParquetRecordWriter: Flushing mem columnStore to file. allocated memory: 134843499
2023-01-19 14:40:11,650 INFO mapred.LocalJobRunner: map > map
2023-01-19 14:40:12,284 INFO mapreduce.Job:  map 51% reduce 0%
2023-01-19 14:40:17,665 INFO mapred.LocalJobRunner: map > map
2023-01-19 14:40:18,311 INFO mapreduce.Job:  map 53% reduce 0%
2023-01-19 14:40:23,669 INFO mapred.LocalJobRunner: map > map
2023-01-19 14:40:24,359 INFO mapreduce.Job:  map 56% reduce 0%
2023-01-19 14:40:27,722 INFO hadoop.InternalParquetRecordWriter: mem size 134284420 > 134217728: flushing 7330100 records to disk.
2023-01-19 14:40:27,722 INFO hadoop.InternalParquetRecordWriter: Flushing mem columnStore to file. allocated memory: 134145200
2023-01-19 14:40:29,680 INFO mapred.LocalJobRunner: map > map
2023-01-19 14:40:30,397 INFO mapreduce.Job:  map 59% reduce 0%
2023-01-19 14:40:35,682 INFO mapred.LocalJobRunner: map > map
2023-01-19 14:40:36,457 INFO mapreduce.Job:  map 62% reduce 0%
2023-01-19 14:40:41,684 INFO mapred.LocalJobRunner: map > map
2023-01-19 14:40:42,496 INFO mapreduce.Job:  map 64% reduce 0%
2023-01-19 14:40:44,002 INFO hadoop.InternalParquetRecordWriter: mem size 135019529 > 134217728: flushing 7340100 records to disk.
2023-01-19 14:40:44,002 INFO hadoop.InternalParquetRecordWriter: Flushing mem columnStore to file. allocated memory: 134629710
2023-01-19 14:40:47,697 INFO mapred.LocalJobRunner: map > map
2023-01-19 14:40:48,524 INFO mapreduce.Job:  map 67% reduce 0%
2023-01-19 14:40:53,697 INFO mapred.LocalJobRunner: map > map
2023-01-19 14:40:54,546 INFO mapreduce.Job:  map 70% reduce 0%
2023-01-19 14:40:59,704 INFO mapred.LocalJobRunner: map > map
2023-01-19 14:41:00,140 INFO hadoop.InternalParquetRecordWriter: mem size 134782191 > 134217728: flushing 7340100 records to disk.
2023-01-19 14:41:00,140 INFO hadoop.InternalParquetRecordWriter: Flushing mem columnStore to file. allocated memory: 134392372
2023-01-19 14:41:00,564 INFO mapreduce.Job:  map 73% reduce 0%
2023-01-19 14:41:05,711 INFO mapred.LocalJobRunner: map > map
2023-01-19 14:41:06,590 INFO mapreduce.Job:  map 75% reduce 0%
2023-01-19 14:41:11,714 INFO mapred.LocalJobRunner: map > map
2023-01-19 14:41:12,640 INFO mapreduce.Job:  map 78% reduce 0%
2023-01-19 14:41:16,613 INFO hadoop.InternalParquetRecordWriter: mem size 134539152 > 134217728: flushing 7330100 records to disk.
2023-01-19 14:41:16,613 INFO hadoop.InternalParquetRecordWriter: Flushing mem columnStore to file. allocated memory: 134400057
2023-01-19 14:41:17,720 INFO mapred.LocalJobRunner: map > map
2023-01-19 14:41:18,673 INFO mapreduce.Job:  map 80% reduce 0%
2023-01-19 14:41:23,724 INFO mapred.LocalJobRunner: map > map
2023-01-19 14:41:24,710 INFO mapreduce.Job:  map 82% reduce 0%
2023-01-19 14:41:29,730 INFO mapred.LocalJobRunner: map > map
2023-01-19 14:41:29,745 INFO mapreduce.Job:  map 85% reduce 0%
2023-01-19 14:41:34,138 INFO hadoop.InternalParquetRecordWriter: mem size 134491901 > 134217728: flushing 7240100 records to disk.
2023-01-19 14:41:34,138 INFO hadoop.InternalParquetRecordWriter: Flushing mem columnStore to file. allocated memory: 134248411
2023-01-19 14:41:35,732 INFO mapred.LocalJobRunner: map > map
2023-01-19 14:41:35,798 INFO mapreduce.Job:  map 88% reduce 0%
2023-01-19 14:41:41,740 INFO mapred.LocalJobRunner: map > map
2023-01-19 14:41:41,823 INFO mapreduce.Job:  map 90% reduce 0%
2023-01-19 14:41:47,744 INFO mapred.LocalJobRunner: map > map
2023-01-19 14:41:47,874 INFO mapreduce.Job:  map 93% reduce 0%
2023-01-19 14:41:50,689 INFO hadoop.InternalParquetRecordWriter: mem size 134465932 > 134217728: flushing 7240100 records to disk.
2023-01-19 14:41:50,689 INFO hadoop.InternalParquetRecordWriter: Flushing mem columnStore to file. allocated memory: 134222492
2023-01-19 14:41:53,750 INFO mapred.LocalJobRunner: map > map
2023-01-19 14:41:53,925 INFO mapreduce.Job:  map 95% reduce 0%
2023-01-19 14:41:59,760 INFO mapred.LocalJobRunner: map > map
2023-01-19 14:41:59,960 INFO mapreduce.Job:  map 98% reduce 0%
2023-01-19 14:42:03,934 INFO mapred.LocalJobRunner: map > map
2023-01-19 14:42:04,130 INFO hadoop.InternalParquetRecordWriter: Flushing mem columnStore to file. allocated memory: 98436996
2023-01-19 14:42:05,367 INFO mapred.Task: Task:attempt_local1727911947_0001_m_000000_0 is done. And is in the process of committing
2023-01-19 14:42:05,371 INFO mapred.LocalJobRunner: map > map
2023-01-19 14:42:05,371 INFO mapred.Task: Task attempt_local1727911947_0001_m_000000_0 is allowed to commit now
2023-01-19 14:42:05,385 INFO output.FileOutputCommitter: Saved output of task 'attempt_local1727911947_0001_m_000000_0' to hdfs://hadoopha/test/hive/input/parquet
2023-01-19 14:42:05,386 INFO mapred.LocalJobRunner: map
2023-01-19 14:42:05,386 INFO mapred.Task: Task 'attempt_local1727911947_0001_m_000000_0' done.
2023-01-19 14:42:05,390 INFO mapred.Task: Final Counters for attempt_local1727911947_0001_m_000000_0: Counters: 21
    File System Counters
        FILE: Number of bytes read=126
        FILE: Number of bytes written=518730
        FILE: Number of read operations=0
        FILE: Number of large read operations=0
        FILE: Number of write operations=0
        HDFS: Number of bytes read=0
        HDFS: Number of bytes written=1841275806
        HDFS: Number of read operations=6
        HDFS: Number of large read operations=0
        HDFS: Number of write operations=4
    Map-Reduce Framework
        Map input records=100000000
        Map output records=100000000
        Input split bytes=78
        Spilled Records=0
        Failed Shuffles=0
        Merged Map outputs=0
        GC time elapsed (ms)=1714
        Total committed heap usage (bytes)=8436842496
    mysql_records_counters
        t_kafka_flink_user=100000000
    File Input Format Counters 
        Bytes Read=0
    File Output Format Counters 
        Bytes Written=1841275806
2023-01-19 14:42:05,390 INFO mapred.LocalJobRunner: Finishing task: attempt_local1727911947_0001_m_000000_0
2023-01-19 14:42:05,393 INFO mapred.LocalJobRunner: map task executor complete.
2023-01-19 14:42:05,409 INFO hadoop.ParquetFileReader: Initiating action with parallelism: 5
2023-01-19 14:42:06,018 INFO mapreduce.Job:  map 100% reduce 0%
2023-01-19 14:42:06,019 INFO mapreduce.Job: Job job_local1727911947_0001 completed successfully
2023-01-19 14:42:06,033 INFO mapreduce.Job: Counters: 21
    File System Counters
        FILE: Number of bytes read=126
        FILE: Number of bytes written=518730
        FILE: Number of read operations=0
        FILE: Number of large read operations=0
        FILE: Number of write operations=0
        HDFS: Number of bytes read=0
        HDFS: Number of bytes written=1841275806
        HDFS: Number of read operations=6
        HDFS: Number of large read operations=0
        HDFS: Number of write operations=4
    Map-Reduce Framework
        Map input records=100000000
        Map output records=100000000
        Input split bytes=78
        Spilled Records=0
        Failed Shuffles=0
        Merged Map outputs=0
        GC time elapsed (ms)=1714
        Total committed heap usage (bytes)=8436842496
    mysql_records_counters
        t_kafka_flink_user=100000000
    File Input Format Counters 
        Bytes Read=0
    File Output Format Counters 
        Bytes Written=1841275806

1、通过亿级数据量在hive和impala中查询比较text、orc和parquet性能表现(一)_hadoop_03

6、在hive中创建表t_kafkauser_orc,并加载数据

hive中load數據只能是textfile。其他类型的文件,不能直接load。
其他类型文件可以通过MR方式。

1)、连接hive

hive安装在192.16.51.40上。具体到你的环境,只要hive能访问hdfs即可。

[root@server8 ~]# beeline
WARNING: Use "yarn jar" to launch YARN applications.

Beeline version 2.1.1-cdh6.2.1 by Apache Hive
beeline> ! connect jdbc:hive2://server8:10000
Connecting to jdbc:hive2://server8:10000
Enter username for jdbc:hive2://server8:10000: root
Enter password for jdbc:hive2://server8:10000: ********(rootroot)
Connected to: Apache Hive (version 2.1.1-cdh6.2.1)
Driver: Hive JDBC (version 2.1.1-cdh6.2.1)
Transaction isolation: TRANSACTION_REPEATABLE_READ

0: jdbc:hive2://server8:10000> show databases;

+----------------+
| database_name  |
+----------------+
| default        |
| test           |
+----------------+
2 rows selected (0.309 seconds)
0: jdbc:hive2://server8:10000>

2)、创建表

# 1、创建数据库
0: jdbc:hive2://server8:10000> create database test_million_data;
INFO  : Compiling command(queryId=hive_20230119153905_c01b4514-4e96-4e7d-8ae9-3a8ae945e60a): create database test_million_data
。。。。
INFO  : OK
No rows affected (0.085 seconds)
0: jdbc:hive2://server8:10000> use test_million_data;
INFO  : Compiling command(queryId=hive_20230119153914_18af7552-49a4-46ad-b3be-4012672290b1): use test_million_data
。。。
INFO  : OK
No rows affected (0.038 seconds)
0: jdbc:hive2://server8:10000> show tables;
INFO  : Compiling command(queryId=hive_20230119153924_b69f8a86-7208-4ed4-9a3c-7d9a60e5db3f): show tables
。。。。。
INFO  : OK
+-----------+
| tab_name  |
+-----------+
+-----------+
No rows selected (0.051 seconds)

# 2、创建表
CREATE TABLE t_kafkauser_orc (id int,userid int, name string,url string, create_time string) 
# row format delimited fields terminated by ","
STORED AS ORC;
 
0: jdbc:hive2://server8:10000> CREATE TABLE t_kafkauser_orc (id int,userid int, name string,url string, create_time string) STORED AS ORC;
。。。。
INFO  : OK
No rows affected (0.655 seconds)
0: jdbc:hive2://server8:10000> show tables;
INFO  : Compiling command(queryId=hive_20230119154526_a4789252-f314-4f5a-ae0b-61fded00a763): show tables
。。。。。
INFO  : OK
+------------------+
|     tab_name     |
+------------------+
| t_kafkauser_orc  |
+------------------+
1 row selected (0.063 seconds)
0: jdbc:hive2://server8:10000> select * from t_kafkauser_orc;
INFO  : Compiling command(queryId=hive_20230119154542_a8cc5f63-d3d8-4a8c-8519-a4e16b8bddb1): select * from t_kafkauser_orc
。。。。
INFO  : OK
+---------------------+-------------------------+-----------------------+----------------------+------------------------------+
| t_kafkauser_orc.id  | t_kafkauser_orc.userid  | t_kafkauser_orc.name  | t_kafkauser_orc.url  | t_kafkauser_orc.create_time  |
+---------------------+-------------------------+-----------------------+----------------------+------------------------------+
+---------------------+-------------------------+-----------------------+----------------------+------------------------------+
No rows selected (0.159 seconds)
0: jdbc:hive2://server8:10000>

3)、sqoop導入数据

# 数据文件在位置hdfs://hadoopha/test/hive/input/orc/part-m-00000.orc
# 数据文件在位置hdfs://hadoopha/test/hive/input/parquet/part-m-00000.snappy.parquet
LOAD DATA INPATH 'hdfs://hadoopha/test/hive/input/orc/part-m-00000.orc' INTO TABLE t_kafkauser_orc;
# 以上的实现方式是错误的,HIVE的Load只能针对TEXFILE文件,其他格式的文件可以通过MR或其他方式来实现

# 以下是直接通过SQOOP把Mysql数据导入Hive中
sqoop import \
  --hcatalog-database test_million_data \
  --hcatalog-table t_kafkauser_orc \
  --connect jdbc:mysql://192.168.10.44:3306/test \
  --username "root" \
  --password "rootroot" \
  --table t_kafka_flink_user \
  --columns "id,userid,name,url,create_time"

4)、验证

0: jdbc:hive2://server8:10000> select * from t_kafkauser_orc limit 3;
INFO  : Compiling command(queryId=hive_20230119173111_71929ab7-7bbb-44fd-aa5f-e5d6209f5257): select * from t_kafkauser_orc limit 3
INFO  : Semantic Analysis Completed
INFO  : Returning Hive schema: Schema(fieldSchemas:[FieldSchema(name:t_kafkauser_orc.id, type:int, comment:null), FieldSchema(name:t_kafkauser_orc.userid, type:int, comment:null), FieldSchema(name:t_kafkauser_orc.name, type:string, comment:null), FieldSchema(name:t_kafkauser_orc.url, type:string, comment:null), FieldSchema(name:t_kafkauser_orc.create_time, type:string, comment:null)], properties:null)
INFO  : Completed compiling command(queryId=hive_20230119173111_71929ab7-7bbb-44fd-aa5f-e5d6209f5257); Time taken: 0.424 seconds
INFO  : Executing command(queryId=hive_20230119173111_71929ab7-7bbb-44fd-aa5f-e5d6209f5257): select * from t_kafkauser_orc limit 3
INFO  : Completed executing command(queryId=hive_20230119173111_71929ab7-7bbb-44fd-aa5f-e5d6209f5257); Time taken: 0.001 seconds
INFO  : OK
+---------------------+-------------------------+-----------------------+-------------------------+------------------------------+
| t_kafkauser_orc.id  | t_kafkauser_orc.userid  | t_kafkauser_orc.name  |   t_kafkauser_orc.url   | t_kafkauser_orc.create_time  |
+---------------------+-------------------------+-----------------------+-------------------------+------------------------------+
| 8045696             | 3                       | alan3_t               | https://www.win.com/3   | 2023-01-18 07:42:47.0        |
| 8045697             | 13                      | alan13_t              | https://www.win.com/13  | 2023-01-18 07:42:47.0        |
| 8045698             | 8                       | alan8_t               | https://www.win.com/8   | 2023-01-18 07:42:47.0        |
+---------------------+-------------------------+-----------------------+-------------------------+------------------------------+

7、在hive中创建表t_kafkauser_parquet,并加载数据

1)、连接hive

同上

2)、创建表

# 未壓縮
CREATE TABLE t_kafkauser_parquet (id int,userid int, name string,url string, create_time string)
# row format delimited fields terminated by ","
STORED AS PARQUET;
#下面示例中使用的是未壓縮的parquet表,從結果上看,未壓縮查詢速度較快,但文件較大;壓縮的表是文件較小,但速度較未壓縮的慢。壓縮的文件大小:442.25M左右,4個文件

#snappy壓縮
create table t_kafkauser_parquet_snappy(id int,userid int, name string,url string, create_time string) 
STORED AS PARQUET 
TBLPROPERTIES('parquet.compression'='SNAPPY');

0: jdbc:hive2://server8:10000> CREATE TABLE t_kafkauser_parquet (id int,userid int, name string,url string, create_time string) STORED AS PARQUET;
INFO  : Compiling command(queryId=hive_20230119173211_b43c5fdc-9b78-470d-9470-7c8dd6ddd4ff): CREATE TABLE t_kafkauser_parquet (id int,userid int, name string,url string, create_time string) STORED AS PARQUET
INFO  : Semantic Analysis Completed
INFO  : Returning Hive schema: Schema(fieldSchemas:null, properties:null)
INFO  : Completed compiling command(queryId=hive_20230119173211_b43c5fdc-9b78-470d-9470-7c8dd6ddd4ff); Time taken: 0.03 seconds
INFO  : Executing command(queryId=hive_20230119173211_b43c5fdc-9b78-470d-9470-7c8dd6ddd4ff): CREATE TABLE t_kafkauser_parquet (id int,userid int, name string,url string, create_time string) STORED AS PARQUET
INFO  : Starting task [Stage-0:DDL] in serial mode
INFO  : Completed executing command(queryId=hive_20230119173211_b43c5fdc-9b78-470d-9470-7c8dd6ddd4ff); Time taken: 0.073 seconds
INFO  : OK
No rows affected (0.118 seconds)
0: jdbc:hive2://server8:10000> show tables;
INFO  : Compiling command(queryId=hive_20230119173219_4e225e84-3563-472e-9ff4-58d36a269cd8): show tables
INFO  : Semantic Analysis Completed
INFO  : Returning Hive schema: Schema(fieldSchemas:[FieldSchema(name:tab_name, type:string, comment:from deserializer)], properties:null)
INFO  : Completed compiling command(queryId=hive_20230119173219_4e225e84-3563-472e-9ff4-58d36a269cd8); Time taken: 0.021 seconds
INFO  : Executing command(queryId=hive_20230119173219_4e225e84-3563-472e-9ff4-58d36a269cd8): show tables
INFO  : Starting task [Stage-0:DDL] in serial mode
INFO  : Completed executing command(queryId=hive_20230119173219_4e225e84-3563-472e-9ff4-58d36a269cd8); Time taken: 0.008 seconds
INFO  : OK
+----------------------+
|       tab_name       |
+----------------------+
| t_kafkauser_orc      |
| t_kafkauser_parquet  |
+----------------------+
2 rows selected (0.049 seconds)

3)、sqoop導入数据

sqoop import \
  --hcatalog-database test_million_data \
  --hcatalog-table t_kafkauser_parquet \
  --connect jdbc:mysql://192.168.10.44:3306/test \
  --username "root" \
  --password "rootroot" \
  --table t_kafka_flink_user \
  --columns "id,userid,name,url,create_time"

4)、验证

注意:由於導入到hive中的數據文件較大,即每個文件有1.36G,共4個。文件位置:/user/hive/warehouse/test_million_data.db/t_kafkauser_parquet
hive默認的Hive Metastore Server 的 Java 堆栈大小(字节)、HiveServer2 的 Java 堆栈大小(字节)改成了1G,之前默認是153M。以及可能影響的内存,比如yarn的map、reduce;
同時,涉及到yarn和MR的運行内存配置表,否則不能運行MR。
具體配置參考:
https://note.youdao.com/s/3eR1dU1T 
或 
https://docs.cloudera.com/HDPDocuments/HDP2/HDP-2.3.4/bk_installing_manually_book/content/determine-hdp-memory-config.html

出現的異常信息
    1、hive的控制臺:
    Unknown HS2 problem when communicating with Thrift server.
    Error: org.apache.thrift.transport.TTransportException: java.net.SocketException: 断开的管道 (Write failed) (state=08S01,code=0)
    
    2、hive執行hadoop-cmf-hive-HIVESERVER2-server8.log.out日志:
    ERROR org.apache.parquet.hadoop.ParquetRecordReader: [8977c893-4c90-4c6c-9ab9-0c19ba94a6ad HiveServer2-Handler-Pool: Thread-37]: Can not initialize counter due to context is not a instance of TaskInputOutputContext, but is org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
    
    3、CDH的頁面錯誤提示:
    The health test result for HIVE_HIVESERVER2S_HEALTHY  has become bad: Healthy HiveServer2: 0. Concerning HiveServer2: 0. Total HiveServer2: 1. Percent healthy: 0.00%. Percent healthy or concerning: 0.00%. Critical threshold: 51.00%.
    

0: jdbc:hive2://server8:10000> select count(id) from t_kafkauser_orc;
INFO  : Compiling command(queryId=hive_20230119173503_2dff23d2-a921-462d-b25b-b0f2ac7fc25d): select count(id) from t_kafkauser_orc
。。。
INFO  : OK
+------------+
|    _c0     |
+------------+
| 100000000  |
+------------+
1 row selected (86.123 seconds)

0: jdbc:hive2://server8:10000> select * from t_kafkauser_parquet limit 10;
INFO  : Compiling command(queryId=hive_20230201151918_4e3b20e7-ffb2-4f8a-97ac-41c5ab6add93): select * from t_kafkauser_parquet limit 10
INFO  : Semantic Analysis Completed
INFO  : Returning Hive schema: Schema(fieldSchemas:[FieldSchema(name:t_kafkauser_parquet.id, type:int, comment:null), FieldSchema(name:t_kafkauser_parquet.userid, type:int, comment:null), FieldSchema(name:t_kafkauser_parquet.name, type:string, comment:null), FieldSchema(name:t_kafkauser_parquet.url, type:string, comment:null), FieldSchema(name:t_kafkauser_parquet.create_time, type:string, comment:null)], properties:null)
INFO  : Completed compiling command(queryId=hive_20230201151918_4e3b20e7-ffb2-4f8a-97ac-41c5ab6add93); Time taken: 0.492 seconds
INFO  : Executing command(queryId=hive_20230201151918_4e3b20e7-ffb2-4f8a-97ac-41c5ab6add93): select * from t_kafkauser_parquet limit 10
INFO  : Completed executing command(queryId=hive_20230201151918_4e3b20e7-ffb2-4f8a-97ac-41c5ab6add93); Time taken: 0.001 seconds
INFO  : OK
+-------------------------+-----------------------------+---------------------------+--------------------------+----------------------------------+
| t_kafkauser_parquet.id  | t_kafkauser_parquet.userid  | t_kafkauser_parquet.name  | t_kafkauser_parquet.url  | t_kafkauser_parquet.create_time  |
+-------------------------+-----------------------------+---------------------------+--------------------------+----------------------------------+
| 8045696                 | 3                           | alan3_t                   | https://www.win.com/3    | 2023-01-18 07:42:47.0            |
| 8045697                 | 13                          | alan13_t                  | https://www.win.com/13   | 2023-01-18 07:42:47.0            |
| 8045698                 | 8                           | alan8_t                   | https://www.win.com/8    | 2023-01-18 07:42:47.0            |
| 8045699                 | 1                           | alan1_t                   | https://www.win.com/1    | 2023-01-18 07:42:47.0            |
| 8045700                 | 14                          | alan14_t                  | https://www.win.com/14   | 2023-01-18 07:42:47.0            |
| 8045701                 | 12                          | alan12_t                  | https://www.win.com/12   | 2023-01-18 07:42:47.0            |
| 8045702                 | 7                           | alan7_t                   | https://www.win.com/7    | 2023-01-18 07:42:47.0            |
| 8045703                 | 2                           | alan2_t                   | https://www.win.com/2    | 2023-01-18 07:42:47.0            |
| 8045704                 | 6                           | alan6_t                   | https://www.win.com/6    | 2023-01-18 07:42:47.0            |
| 8045705                 | 0                           | alan0_t                   | https://www.win.com/0    | 2023-01-18 07:42:47.0            |
+-------------------------+-----------------------------+---------------------------+--------------------------+----------------------------------+

8、在impala中刷新hive表

1、鏈接impala
    [root@server7 ~]# impala-shell
    Starting Impala Shell without Kerberos authentication
    Opened TCP connection to server7:21000
    Connected to server7:21000
    Server version: impalad version 3.2.0-cdh6.2.1 RELEASE (build 525e372410dd2ce206e2ad0f21f57cae7380c0cb)
    ***********************************************************************************
    Welcome to the Impala shell.
    (Impala Shell v3.2.0-cdh6.2.1 (525e372) built on Wed Sep 11 01:30:44 PDT 2019)
    
    You can change the Impala daemon that you're connected to by using the CONNECT
    command.To see how Impala will plan to run your query without actually executing
    it, use the EXPLAIN command. You can change the level of detail in the EXPLAIN
    output by setting the EXPLAIN_LEVEL query option.
    ***********************************************************************************
2、刷新hive中創建的表
    invalidate metadata;
3、簡單驗證
    [server7:21000] default> show databases;
    Query: show databases
    +-------------------+----------------------------------------------+
    | name              | comment                                      |
    +-------------------+----------------------------------------------+
    | _impala_builtins  | System database for Impala builtin functions |
    | default           | Default Hive database                        |
    | test              |                                              |
    | test_million_data |                                              |
    +-------------------+----------------------------------------------+
    Fetched 4 row(s) in 0.17s
    [server7:21000] default> use test_million_data;
    Query: use test_million_data
    [server7:21000] test_million_data> show tables;
    Query: show tables
    +---------------------+
    | name                |
    +---------------------+
    | t_kafkauser_orc     |
    | t_kafkauser_parquet |
    +---------------------+
    Fetched 2 row(s) in 0.01s
    [server7:21000] test_million_data> select * from t_kafkauser_orc limit 10;
    Query: select * from t_kafkauser_orc limit 10
    Query submitted at: 2023-02-01 15:41:06 (Coordinator: http://server7:25000)
    Query progress can be monitored at: http://server7:25000/query_plan?query_id=b148ed102b7bbd91:fc8215f400000000
    +---------+--------+----------+------------------------+-----------------------+
    | id      | userid | name     | url                    | create_time           |
    +---------+--------+----------+------------------------+-----------------------+
    | 8045696 | 3      | alan3_t  | https://www.win.com/3  | 2023-01-18 07:42:47.0 |
    | 8045697 | 13     | alan13_t | https://www.win.com/13 | 2023-01-18 07:42:47.0 |
    | 8045698 | 8      | alan8_t  | https://www.win.com/8  | 2023-01-18 07:42:47.0 |
    | 8045699 | 1      | alan1_t  | https://www.win.com/1  | 2023-01-18 07:42:47.0 |
    | 8045700 | 14     | alan14_t | https://www.win.com/14 | 2023-01-18 07:42:47.0 |
    | 8045701 | 12     | alan12_t | https://www.win.com/12 | 2023-01-18 07:42:47.0 |
    | 8045702 | 7      | alan7_t  | https://www.win.com/7  | 2023-01-18 07:42:47.0 |
    | 8045703 | 2      | alan2_t  | https://www.win.com/2  | 2023-01-18 07:42:47.0 |
    | 8045704 | 6      | alan6_t  | https://www.win.com/6  | 2023-01-18 07:42:47.0 |
    | 8045705 | 0      | alan0_t  | https://www.win.com/0  | 2023-01-18 07:42:47.0 |
    +---------+--------+----------+------------------------+-----------------------+
    Fetched 10 row(s) in 4.32s

未完待续,请点击链接阅读第二部分
1、通过亿级数据量在hive和impala中查询比较text、orc和parquet性能表现(二)


举报

相关推荐

0 条评论