0
点赞
收藏
分享

微信扫一扫

《Hadoop 权威指南》读书笔记之五 — Chapter 5

惠特曼 2022-01-26 阅读 89


《​​Hadoop​​​ 权威指南》读书笔记之五 — ​​Chapter 5​​ [updating…]

1.分布式数据处理中哪些方面使用到了 ​​Serialization​​(序列化)?


  • 1.for interprocess communication
  • 2.for persistne storage
    进程间通信 和 持久化存储
    在回答这个问题之前,我们首先需要思考的是:什么是序列化? 为什么需要序列化?
    针对上述两个问题的回答,可见我的博客:Java序列化详解

2.在Hadoop 中, 节点间不同进程的通信的方式是什么?

通过​​RPCs​​。所谓​​RPCs​​ 指的是:​​remote procedure calls​​。即远程方法调用。

同样,在看这个问题之前,我们需要了解:​​什么是RPCs​​?

关于什么是​​RPC​​的问题,可见我的博客:RPC详解

3.​​RPC​

通常情况下,​​RPC​​序列化格式最好具备如下四个条件:


  • 01.​​Compact​​【可压缩】
    带宽是分布式系统中的瓶颈,所以具有压缩格式的文件很重要。
  • 02.​​Fast​​【快速】
    进程间的通信在分布式系统中是非常关键的(可以说是核心)。所以尽量减少序列化和反序列化的步骤所带来的开销是必要的。
  • 03.​​Extensible​​【可扩展】
    IT不断变化,RPCs的格式要随着时代不断更新。
  • 04.​​InterOperable​​ 【共同协作】
    即使节点使用着不同的语言,不同的节点之间也需要相互协作。

虽然​​RPC​​​ 与 ​​persistent storage​​【数据的持久化存储】 不同,主要的不同点如下:

  • 为持久化存储的数据格式和序列化框架要求的数据格式是不同的。毕竟,一个​​RPC​​​的生命周期是小于1s的,然而持久化的数据却是在写之后的几年才被重新读取。【​​persistant data​​​ 经常需要保存数据数年】但是​​RPCs​​​序列化想要的四个特点同样适用于​​persistent storage​​:


We want the storage format to be compact (to make efficient use of storage space),
fast (so the overhead in reading or writing terabytes of data is minimal)
extensible (so we can transparently read data written in an older format)
and interoperable (so we can read or write persistent data using different languages).


基于上述原因,​​Hadoop​开发出自己所需要的序列化格式:​​Writables​​。

4.​​Writable​​的优缺点

优点:


which is certainly compact and fast,


缺点:


but not so easy to extend or use from languages other than Java.


原因:


Because Writables are central to Hadoop (most MapReduce programs use them for their key and value types)


译:​​Writables​​​ 是​​Hadoop​​​ 的核心,(大多数​​MapReduce​​程序为它们的key以及value类型使用它们)

5.​​Writables​​ 接口

5.1 接口详解

​Writable​​ 接口中 定义了两个方法:


  • 01.一个是写它的状态到一个DataOutput 字节流的方法​​write()​​;
  • 02.一个是从DataInput字节流中读取它的状态的方法​​readFields()​​;
    问:这里面【它的状态】是什么意思?

5.2 接口代码
package org.apache.hadoop.io;
import java.io.DataOutput;
import java.io.DataInput;
import java.io.IOException;
public interface Writable {
void write(DataOutput out) throws IOException;
void readFields(DataInput in) throws IOException;
}

这个接口的详细文档可参考我的博客 Hadoop源码解析之Writable接口

5.3 实现自定义的 ​​Writable​

  • 01.所有实现 ​​Writable​​ 接口的类必须有一个无参的构造函数。
    原因是:为了 ​​MapReduce​​ 框架能够实例化,然后从 ​​readFields()​​ 方法中填充字段
  • 02.必须实现​​toString()​​方法
    原因是: ​​TextOutputFormat​​方法调用​​keys​​以及​​values​​ 的 ​​toString()​​方法输出。

6.​​RawComparator​

在详细介绍这个​​RawComparator​​之前,先看看如下几个问题:

6.1 为什么需要​​comparator​​?

​comparator​​ 顾名思义,其意思是:比较。那为什么需要这个比较呢?因为在​​MapReduce​​中,有许多排序的操作,而排序操作就涉及到比较。【只有比较之后才能排序嘛】例如:在 ​​shuffle​​ 的时候,就需要排序,然后把相同的key分发到同一个 ​​reduce​​中。

【这里又引出一个问题:​​Hadoop​​中有哪些过程涉及到了​​sort​​ 呢?等我博客哈。】

6.2 ​​RawComparator​​ 类简介

  • 01.继承自 Java 的 ​​Comparator​
  • 02.这个接口允许从流中比较记录,而不用反序列化成对象 => 这种能力就可以避免其减少对象创建的操作消耗


A Comparator that operates directly on byte representations of objects.


直接基于对象的字节数组上【这样就可以不用反序列化了】的比较操作的比较器(​​Comparator​​)

这么做的好处就是:

  • 减少创建对象所带来的开销
6.3 ​​RawComparator​​ 方法
  • ​compare()​​方法
int compare(byte[] b1,
int s1,
int l1,
byte[] b2,
int s2,
int l2)

Compare two objects in binary. b1[s1:l1] is the first object, and b2[s2:l2] is the second object.
Parameters:
b1 - The first byte array.
s1 - The position index in b1. The object under comparison's starting index.
l1 - The length of the object in b1.
b2 - The second byte array.
s2 - The position index in b2. The object under comparison's starting index.
l2 - The length of the object under comparison in b2.
Returns:
An integer result of the comparison.

​compare​​ 这个方法比较的是两个字节数组,我们都知道序列化其实就是 对象 -> 字节数组的过程;这里直接比较字节数组的所有位置是否相同,从而判断这两个对象是否相同。


WritableComparator is a general-purpose implementation of RawComparator for WritableComparable classes.


类​​WritableComparator​​ 是 ​​RawComparator​​ 的一个通用实现。【实在是不知道这个​​for WritableComparable classes​​该怎么翻译】

这个​​WritableComparator​​ 类有两个主要的功能:




First, it provides a default implementation of the raw compare() method that deserializes the objects to be compared from the stream and invokes the object compare() method.



首先,它对原生的​​compare()​​​方法提供一个默认的实现,从而从流中序列化成一个可比较的对象,并且调用​​compare()​​方法




Second, it acts as a factory for RawComparator instances (that Writable implementations have registered)



第二,它作为一个​​RawComparator​​ 实例的工厂方法

7. ​​WritableComparable​

7.1 ​​WritableComparable​​ 简介

01.​​extends Writable , Comparable​

7.2 ​​comparator​​​ 与 ​​comparable​​ 到底是什么区别?

8.​​String​​​ 和 ​​Text​​ 的区别

同理,在问区别之前,我们先谈谈两者到底是什么?

8.1 ​​String​​是什么?

  • ​String​​ 并非是java的基本类型
  • ​the String class represents character strings.​​​​String​​类代表字符串。

8.2 ​​Text​​是什么?


Text is a Writable for UTF-8 sequences. It can be thought of as the Writable equivalent of java.lang.String.


​Text​​​ 是一个​​Writable​​​ 针对 ​​UTF-8​​​序列。它可以被看作是 ​​java.lang.String​​​的 ​​Writbale​​ 等价物。


The Text class uses an int (with a variable-length encoding) to store the number of bytes in the string encoding, so the maximum value is 2 GB.


​Text​​​ 使用一个int 型数 (使用可变长度编码)去存储字符串编码的字节数目,所有最大的值是 ​​2GB​​【2^31B = 2GB】


Text uses standard UTF-8, which makes it potentially easier to interoperate with other tools that understand UTF-8


​Text​​​ 使用标准的​​UTF-8​​​编码,这个可以潜在的和其它使用​​UTF-8​​编码的工具进行交互操作。

8.3 两者的区别

  • 01.二者的编码方式不同
  • 02.二者的计算方式不同( ​​String​​ 采取的是使用字符数,而 ​​Text​​ 采取的是字节数。=> 进而导致了 ​​length()​​, ​​index(), charAt()​​ 等方法的差异)
  • 03.可变性不同。
    String对象不可重用;但是Text 对象却是可以重用的;
    那么问题又来了,String 对象不可重用的意思是什么?

9. ​​NullWritable​​ 类有什么作用?


  • 01.它也是 ​​Writable​​ 的实现类,但是它是不具有任何长度的序列化
  • 02.它经常被用作一个占位符,用于在 ​​MapReduce​​ 中
  • 03.可以通过 ​​NullWritable.get()​​ 方法获取其实例

10.​​The SequenceFile format​

10.1 前言


For some applications, you need a specialized data structure to hold your data. For doing MapReduce-based processing, putting each blob of binary data into its own file doesn’t scale, so Hadoop developed a number of higher-level containers for these situations.


译:对于一些应用来说,你需要指定数据结构去保存你的数据。 对于执行基于​​MapReduce​​​ 的处理来说,将每个二进制数据大对象放到自己的文件中是不具备扩展性的,所以​​Hadoop​​专为这些场景研发出了许多 高级的容器。

10.2 为什么需要​​SequenceFile​​ ?


If you want to log binary types, plain text isn’t a suitable format. Hadoop’s SequenceFile class fits the bill in this situation, providing a persistent data structure for binary key-value pairs.


如果你想使用二进制类型去记录日志,纯文本不是一个合适的格式。 ​​Hadoop​​​ 的 ​​SequenceFile​​​ 类满足这个场景,提供一个持久的数据结构用于二进制的 ​​key-value​​ 对。

10.3 如何使用这个类?


To use it as a logfile format, you would choose a key, such as timestamp represented by a LongWritable, and the value would be a Writable that represents the quantity being logged.


这个类怎么用?


  • 可以使用这个类作为日志格式,那么1):你需要选择一个key, 诸如 由一个​​LongWritable​​ 类型的​​timestamp​​ 2):同时值将会是一个​​Writable​​类型, 代表的是要记录的数量【要记录的数量是什么意思?】
  • 作为小文件存储的容器(​​work well as containers for smaller files​​)

10.4 如何写一个​​SequenceFile​​ ?


To create a SequenceFile



  • use one of its createWriter() static methods, which return a SequenceFile.Writer instance.
  • specify a stream to write to
  • either an ​​FSDataOutputStream​​​ or a ​​FileSystem​​​ and ​​Path​​ pairing
  • a ​​Configuration​​ object, and the key and value types

代码如下:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;

import java.io.IOException;
import java.net.URI;


public class SequenceFileWriteDemo {
public static final String[] DATA = {
"One, two, buckle my shoe",
"Three, four, shut the door",
"Five, six, pick up sticks",
"Seven, eight, lay them straight",
"Nine, ten, a big fat hen"
};

public static void main(String[] args) throws IOException {
if (args.length == 0) {
System.out.println("no args!");
System.exit(1);
}

//step1: get uri, and get a Path instance through uri
String uri = args[0];
Path path = new Path(uri);

//step 2:get a configuration
Configuration conf = new Configuration();
//step 3:get a FileSystem instance by uri and conf
FileSystem fs = FileSystem.get(URI.create(uri), conf);

System.out.println("path.name "+path.getName()+path.getParent());

//step 4:get key and value
IntWritable key = new IntWritable();
Text value = new Text();

//step 5:get a SequenceFile writer
SequenceFile.Writer writer = null;

//step 6: get the instance through fs,conf,path,key.Class,value.Class
writer = SequenceFile.createWriter(fs, conf, path,key.getClass(),value.getClass());

//step 7: write data into file
for( int i = 0;i < 2;i++) {
key.set(100 - i);
value.set(DATA[i % DATA.length]);
System.out.printf("[%s]\t%s\t%s\n",writer.getLength(),key,value);
writer.append(key,value);
}

//step 8:close the writer
IOUtils.closeStream(writer);
}
}

上面这个类执行的效果如下:

[root@server4 thumbs]# hdfs dfs -cat /output/sequenceFile
SEQ org.apache.hadoop.io.IntWritableorg.apache.hadoop.io.Text*org.apache.hadoop.io.compress.DefaultCodec㡑&³Y9i盠%dx?Q()ЗQH*MώIUȭT(ψOx'cx
ǨJMԑHɯ-ё(̨-Q(ȈUHʏ/}¸ `XshellXshell[root@server4 thumbs]#

可以看到,这个查看文件之后得到的效果是乱码。因为二进制的文件本来就不是可以使用 ​​cat​​​ 方法查看的。既然使用​​cat​​​ 方法看不到​​sequence File​​​,那么该怎么查看​​sequence file​​呢?

10.5 ​​Reading a SequenceFile​


Reading sequence files from beginning to end is a matter of creating an instance of SequenceFile.Reader and iterating over records by repeatedly invoking one of the next() methods.


从头到尾读取一个​​sequence files​​​ 是​​SequenceFile.Reader​​​的任务,并且可以通过重复调用 ​​next()​​​ 方法遍历一个记录。这个​​next()​​ 方法会针对你的序列化方式调用相应的方法即可。

package hadoopDefinitiveGuide.chapter_5;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.ReflectionUtils;

import java.io.IOException;
import java.net.URI;

public class SequenceFileReadDemo {
public static void main(String[] args) throws IOException {
String uri = args[0];
Path path = new Path(args[0]);
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(uri), conf);

SequenceFile.Reader reader = null;
reader = new SequenceFile.Reader(fs,path,conf);

Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
Writable value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf);

long position = reader.getPosition();
while (reader.next(key, value)) {
String syncSeen = reader.syncSeen() ? "*" : "";
System.out.printf("[%s%s]\t%s\t%s\n",position,syncSeen,key,value);
position = reader.getPosition();
}
IOUtils.closeStream(reader);
}
}

执行结果如下:

《Hadoop 权威指南》读书笔记之五 — Chapter 5_java


Another feature of the program is that it displays the positions of the sync points in the sequence file. A sync point is a point in the stream that can be used to resynchronize with a record boundary if the reader is “lost” — for example, after seeking to an arbitrary position in the stream. Sync points are recorded by SequenceFile.Writer, which inserts a special entry to mark the sync point every few records as a sequence file is being written. Such entries are small enough to incur only a modest storage overhead — less than 1%. Sync points always align with record boundaries


译:上述程序使用到的另一个特性是: 展示了​​sequence file​​ 中的同步点的位置。

03.可以使用MapReduce 这个对SequenceFile 进行排序以及合并

10.6 ​​SequenceFile format​
10.6.1 ​​record​


A sequence file consists of a header followed by one or more records


《Hadoop 权威指南》读书笔记之五 — Chapter 5_hadoop_02

可以看到,这里的一个​​Header​​后面有两个​​Record​​,然后是一个​​Sync​​…

在​​Header​​ 中主要是如下三个部分:


  • first three bytes of a sequence file are the bytes SEQ, which act as a magic number
  • these are followed by a single byte representing the version number
  • contains other fields: name of key and vlaue classes, compression details[codec], user-defined metadata, the sync marker.

这里的​​Sync​​​ 就是上述的​​Sync point​​。


sync marker is used to allow a reader to synchronize to a record boundary from any position in the file。


​sync maker​​ 被用于允许一个 reader从文件的任何一个位置同步到一个记录边界。

​sync maker​​有什么特点呢?


  • Each file has a randomly generated sync marker, whose value is stored in the header.
  • Sync markers appear between records in the sequence file.
  • They are designed to incur less than a 1% storage overhead, so they don’t necessarily appear between every pair of records (such is the case for short records).

其中的 ​​record​​又分为 ​​record compression​​ 和 ​​record no compression​​ 两种。 而是否压缩取决于压缩功能是否开启了。默认是不开启压缩。

在不开启压缩的情况下,每个​​record​​ 的组成部分如下:


  • record length [4 B IntWritable]
  • key length [4B IntWritable]
  • value

如果开启了​​record​​​压缩,每个​​record​​​的组成部分大致相同,只不过​​value​​​是被定义在​​header​​​中的​​codec​​压缩了。但是需要注意的是 键不压缩

10.6.2 ​​block​


Block compression compresses multiple records at once; it is therefore more compact than and record compression
should generally be preferred over record compression because it has the opportunity to take advantage of similarities between records.


【我对为什么​​block compression​​​可以利用​​records​​间的相似性 有些疑问。】


Records are added to a block until it reaches a minimum size in bytes, defined by the io.seqfile.compress.blocksize property; the default is one million bytes.



A sync marker is written before the start of every block.


那么​​block​​ 是怎么组成的呢?由如下4个部分:


  • the key lengths
  • the keys
  • the value lengths
  • and the values

《Hadoop 权威指南》读书笔记之五 — Chapter 5_apache_03



举报

相关推荐

0 条评论