0
点赞
收藏
分享

微信扫一扫

Okio源码框架分析

android 开发的一定听过 square 这家公司,也肯定使用过他们家的产品,比如 retrofit,picasso,okhttp,okio等等。
今天我们就来分析 Okio 这个开源框架的实现原理,它本来是大名鼎鼎的处理http 请求的开源框架 OkHttp 的一部分,后来被独立出来,专门来处理 javaIO 流的。

一. Source 和 Sink

Okio 是用来处理 IO 流的,那么最重要的两个类就是 SourceSinkSource 相当于 InputStream, Sink 相当于 OutputStream

1.1 Source 和 InputStream

我们说 Source 相当于 InputStream,也就是输入流,它的作用就是从流中读取数据。
怎么读取数据,就先看看它们两个的源码:

public interface Source extends Closeable {
/**
* Removes at least 1, and up to {@code byteCount} bytes from this and appends
* them to {@code sink}. Returns the number of bytes read, or -1 if this
* source is exhausted.
*/

long read(Buffer sink, long byteCount) throws IOException;

/** Returns the timeout for this source. */
Timeout timeout();

/**
* Closes this source and releases the resources held by this source. It is an
* error to read a closed source. It is safe to close a source more than once.
*/

@Override void close() throws IOException;
}

Source 只有三个方法:

  • long read(Buffer sink, long byteCount):从流中读取至少1个最多byteCountbyte 字节数据,存到 sink 中,返回读取的byte 字节数, 返回 -1 表示已经到达输入流的末尾,流中没有可用的字节。
  • timeout :设置这个输入流 Source 的超时时间。
  • close :关闭这个输入流 Source ,释放资源。
public abstract class InputStream implements Closeable {
public abstract int read() throws IOException;
public int read(byte b[]) throws IOException {
return read(b, 0, b.length);
}
public int read(byte b[], int off, int len) throws IOException {
if (b == null) {
throw new NullPointerException();
} else if (off < 0 || len < 0 || len > b.length - off) {
throw new IndexOutOfBoundsException();
} else if (len == 0) {
return 0;
}

int c = read();
if (c == -1) {
return -1;
}
b[off] = (byte)c;

int i = 1;
try {
for (; i < len ; i++) {
c = read();
if (c == -1) {
break;
}
b[off + i] = (byte)c;
}
} catch (IOException ee) {
}
return i;
}
public long skip(long n) throws IOException {

long remaining = n;
int nr;

if (n <= 0) {
return 0;
}

int size = (int)Math.min(MAX_SKIP_BUFFER_SIZE, remaining);
byte[] skipBuffer = new byte[size];
while (remaining > 0) {
nr = read(skipBuffer, 0, (int)Math.min(size, remaining));
if (nr < 0) {
break;
}
remaining -= nr;
}

return n - remaining;
}
public int available() throws IOException {
return 0;
}
public void close() throws IOException {}
public synchronized void mark(int readlimit) {}
public synchronized void reset() throws IOException {
throw new IOException("mark/reset not supported");
}
public boolean markSupported() {
return false;
}
}

InputStream 的方法比较多:

  • abstract int read() : 从输入流读取数据的下一个字节。返回值是以0到255的整数。如果由于到达流的末尾而没有可用的字节,则返回值-1。
  • int read(byte b[])int read(byte b[], int off, int len) :从输入流中读取一定数量的字节数据,存入b[] 字节数组中,并返回读取字节个数。如果返回 -1 表示已经到达输入流的末尾,流中没有可用的字节。
  • long skip(long n) :跳过并丢弃该输入流中的n字节数据,但是有可能并不能跳过这么多字节数据,返回值是实际跳过的字节个数。
  • int available() :返回可从该输入流读取(或跳过)的字节数的估计值,默认返回 0,所以它强烈依赖子类的实现。
  • close():关闭输入流 ,释放资源。
  • mark(int readlimit),reset(),markSupported() :让输入流可以重读。

仔细比较 SourceInputStream ,你会发现:

  • Source 有一个获取超时时间的方法,对输入流的读取可以进行超时控制。
  • Source 只有一个 read 方法,从输入流中读取多个字节数据;而 InputStream 即可以读取单个字节数据,也可以读取多个字节数据。
  • InputStreamskip(long n) 方法,虽然 Source 中没有,但是它的子类 BufferedSource 中提供了skip(long byteCount) 方法。
  • InputStreamavailable() 方法,但是这本身就是一个有缺陷的方法; Source 中取消了这个方法。
  • InputStreammark(), reset,markSupported方法,它们也是不安全的方法, Source 中也取消了这些方法。

1.2 Sink 和 OutputStream

我们说 Sink 相当于OutputStream,也就是输出流,它的作用就是向流中写入数据。
怎么写入数据,就先看看它们两个的源码:

public interface Sink extends Closeable, Flushable {
/** Removes {@code byteCount} bytes from {@code source} and appends them to this. */
void write(Buffer source, long byteCount) throws IOException;

/** Pushes all buffered bytes to their final destination. */
@Override void flush() throws IOException;

/** Returns the timeout for this sink. */
Timeout timeout();

/**
* Pushes all buffered bytes to their final destination and releases the
* resources held by this sink. It is an error to write a closed sink. It is
* safe to close a sink more than once.
*/

@Override void close() throws IOException;
}

Sink 只有四个方法:

  • void write(Buffer source, long byteCount):从 source 中移除 byteCount 个字节数据写入输出流Sink 中。
  • flush():将所有缓冲区字节数据推到它们的最终目的地。
  • timeout():设置这个输出流 Sink 的超时时间。
  • close() :关闭输出流 ,释放资源。
public abstract class OutputStream implements Closeable, Flushable {
public abstract void write(int b) throws IOException;
public void write(byte b[]) throws IOException {
write(b, 0, b.length);
}
public void write(byte b[], int off, int len) throws IOException {
if (b == null) {
throw new NullPointerException();
} else if ((off < 0) || (off > b.length) || (len < 0) ||
((off + len) > b.length) || ((off + len) < 0)) {
throw new IndexOutOfBoundsException();
} else if (len == 0) {
return;
}
for (int i = 0 ; i < len ; i++) {
write(b[off + i]);
}
}
public void flush() throws IOException {
}
public void close() throws IOException {
}
}
  • abstract void write(int b):将指定的字节写入输出流。
  • write(byte b[])write(byte b[], int off, int len):将指定字节数组中的数据写入到输出流中。
  • flush(): 将所有缓冲区字节数据推到它们的最终目的地。
  • close():关闭输出流 ,释放资源。

仔细比较 SinkOutputStream ,你会发现:

  • Sink 有一个获取超时时间的方法,对输入流的读取可以进行超时控制。
  • Sink 只有写入多个字节数据的方法,而 OutputStream 既有写入单个字节的方法,也有写入多个字节数据的方法。

1.3 小结

SourceSink 都比较简单,方法很少。

  • Source有一个 read(Buffer sink, long byteCount) 方法,表示从当前输入流 Source 中读取多个字节数据,存入 sink 中。
  • Sink 有一个 write(Buffer source, long byteCount) 方法,表示将 source 中的 byteCount 个字节数据写入当前输出流Sink 中。

你会发现有一个重要类 Buffer ,它在SourceSink 的方法中都出现了,那它的作用是什么?

而仔细阅读它的源码,你会惊讶地发现,虽然Buffer 中方法非常多,但是它的成员变量只有两个:

  • long size:表示当前缓存区 Buffer 的大小,也就是拥有的字节数量。
  • Segment head:表示一个链表,用它来存储缓存区的字节数据。

所以我们先介绍 Segment

二. Segment

我们知道 Buffer 靠一个链表来存储缓存区的字节数据,而这个链表中的每个节点都是一个 Segment 对象。
Segment 表示缓存区Buffer的一个片段,下面我们来分析这个类。

2.1 成员属性

  /** The size of all segments in bytes. */
static final int SIZE = 8192;

/** Segments will be shared when doing so avoids {@code arraycopy()} of this many bytes. */
static final int SHARE_MINIMUM = 1024;

final byte[] data;

/** The next byte of application data byte to read in this segment. */
int pos;

/** The first byte of available data ready to be written to. */
int limit;

/** True if other segments or byte strings use the same byte array. */
boolean shared;

/** True if this segment owns the byte array and can append to it, extending {@code limit}. */
boolean owner;

/** Next segment in a linked or circularly-linked list. */
Segment next;

/** Previous segment in a circularly-linked list. */
Segment prev;
  • SIZE = 8192: 表示片段 Segment 拥有的字节数组 data 的长度就是 8192
  • SHARE_MINIMUM = 1024:在做片段切割的时候(即 split 方法),超过这个值,那么切割出一个共享片段,小于这个值就是新的独立小片段。
  • byte[] data:用来存储字节数据的字节数组,它的大小就是SIZE(8192)。
  • int pos:表示片段读取的下一个字节位置。
  • int limit:表示片段写入字节位置。
  • boolean shared:表示当前片段是共享的片段。就是多个片段的字节数组data 是同一个引用。
  • boolean owner:表示当前片段拥有字节数组data的所有权。
  • Segment next : 表示当前片段的下一个片段(Segment)。
  • Segment prev : 表示当前片段的前一个片段(Segment)。

这些字段的作用:

  • data,poslimit 这三个属性就可以表示当前片段储存的可用数据了。
  • nextprev 可以形成一个双向链表。
  • sharedowner ,有些人不理解为什么要两个字段,一个共享属性shared 不就可以了么。

2.2 构造方法

  Segment() {
this.data = new byte[SIZE];
this.owner = true;
this.shared = false;
}

Segment(byte[] data, int pos, int limit, boolean shared, boolean owner) {
this.data = data;
this.pos = pos;
this.limit = limit;
this.shared = shared;
this.owner = owner;
}

如果是空惨构造,那么创建的Segment就是独享的片段;否则就根据传递的参数来创建Segment

2.3 sharedCopy 方法

  final Segment sharedCopy() {
shared = true;
return new Segment(data, pos, limit, true, false);
}

返回当前片段的共享片段,当前片段的 shared 属性也设置为 true

2.4 unsharedCopy 方法

  /** Returns a new segment that its own private copy of the underlying byte array. */
final Segment unsharedCopy() {
return new Segment(data.clone(), pos, limit, false, true);
}

用当前片段生成一个独享片段,所以使用 data.clone() 复制出一个新的字节数组,这样就与当前片段的字节数组data 没有关联了。

2.5 pop 方法

  /**
* Removes this segment of a circularly-linked list and returns its successor.
* Returns null if the list is now empty.
*/

public final @Nullable Segment pop() {
Segment result = next != this ? next : null;
prev.next = next;
next.prev = prev;
next = null;
prev = null;
return result;
}

将当前片段从双向链表中删除,并返回链表中下一个片段。

2.6 push 方法

  /**
* Appends {@code segment} after this segment in the circularly-linked list.
* Returns the pushed segment.
*/

public final Segment push(Segment segment) {
segment.prev = this;
segment.next = next;
next.prev = segment;
next = segment;
return segment;
}

在双向链表中,当前片段后面插入传入片段 segment,并返回这个插入片段segment

2.7 split 方法

  public final Segment split(int byteCount) {
if (byteCount <= 0 || byteCount > limit - pos) throw new IllegalArgumentException();
Segment prefix;

// We have two competing performance goals:
// - Avoid copying data. We accomplish this by sharing segments.
// - Avoid short shared segments. These are bad for performance because they are readonly and
// may lead to long chains of short segments.
// To balance these goals we only share segments when the copy will be large.
if (byteCount >= SHARE_MINIMUM) {
prefix = sharedCopy();
} else {
prefix = SegmentPool.take();
System.arraycopy(data, pos, prefix.data, 0, byteCount);
}

prefix.limit = prefix.pos + byteCount;
pos += byteCount;
prev.push(prefix);
return prefix;
}

将当前片段根据 byteCount 值分成两个片段,第一个片段包含[pos..pos+ byteccount]中的数据。第二个片段包含[pos+ byteccount ..limit]中的数据。

我们来分析方法流程:

  • 首先判断分割片段 byteCount 的值,小于等于0 或者大于当前片段可用数据的大小(即 limit - pos),那么直接抛出异常。
  • 因为我们要将当前片段分割成两个片段,那么有两个选择,一是分裂出一个独享片段,一是分裂出一个共享片段。
  • prefix.limit = prefix.pos + byteCount 修改新分裂片段的limit 值。
  • pos += byteCount 将当前片段的 pos 值修改,因为当前片段一部分值已经分裂出去了。
  • prev.push(prefix) 将分裂的片段插入到当前片段的前面。
  • 最后返回新分裂的片段,而它的下一个片段就是当前片段。

2.8 compact 方法

  /**
* Call this when the tail and its predecessor may both be less than half
* full. This will copy data so that segments can be recycled.
*/

public final void compact() {
if (prev == this) throw new IllegalStateException();
if (!prev.owner) return; // Cannot compact: prev isn't writable.
int byteCount = limit - pos;
int availableByteCount = SIZE - prev.limit + (prev.shared ? 0 : prev.pos);
if (byteCount > availableByteCount) return; // Cannot compact: not enough writable space.
writeTo(prev, byteCount);
pop();
SegmentPool.recycle(this);
}

将当前片段和它的前一个片段的可用数据进行合并,来回收多余片段。
方法流程:

  • prev == this当链表只有当前片段,肯定不能进行合并,这里直接报错。
  • !prev.owner 当前片段前一个片段的 owner == false,表示不能写入数据,那么不能合并,直接返回。
  • byteCount = limit - pos 获取当前片段的可用数据大小。
  • availableByteCount 表示当前片段前一个片段剩余可写入数据的大小。
  • 比较 byteCountavailableByteCount 的值,如果没有足够空间写入,那么就不能合并,直接返回。
  • writeTo(prev, byteCount) 将当前片段中的可用数据写入到前一个片段prev 中。
  • pop() 因为当前片段的数据已经写入到前一个片段中了,那么就从链表中删除当前片段。
  • SegmentPool.recycle(this) 片段池中回收当前片段。

2.9 writeTo 方法

  public final void writeTo(Segment sink, int byteCount) {
if (!sink.owner) throw new IllegalArgumentException();
if (sink.limit + byteCount > SIZE) {
// We can't fit byteCount bytes at the sink's current position. Shift sink first.
if (sink.shared) throw new IllegalArgumentException();
if (sink.limit + byteCount - sink.pos > SIZE) throw new IllegalArgumentException();
System.arraycopy(sink.data, sink.pos, sink.data, 0, sink.limit - sink.pos);
sink.limit -= sink.pos;
sink.pos = 0;
}

System.arraycopy(data, pos, sink.data, sink.limit, byteCount);
sink.limit += byteCount;
pos += byteCount;
}

byteCount 个字节数据从当前片段移动到片段sink 中。
方法流程:

  • 首先如果片段sink 不能写入,那么直接抛出异常。
  • sink.limit + byteCount > SIZE 表示片段sink的剩余位置不够,那么就要进行数据的移动操作。
    • 如果片段sink 是共享片段,直接抛出异常,因为共享片段不能移动。
    • 如果加上 sink.pos 后仍然没有足够的写入空间,那么也直接抛出异常。
    • 使用 System.arraycopy 方法,将片段sink 的数据移动到字节数组data 的开头,以便腾出足够的位置写入。
    • 改变片段 sinklimitpos 的值。
  • 使用 System.arraycopy 方法将当前片段中 byteCount 个字节数据复制到片段 sink 中。
  • 修改当前片段的 pos 值和片段 sinklimit值。

三. SegmentPool

分析完 Segment,我们简单分析一下 SegmentPool, 它表示 Segment 的实例池。

这个类基本上都是静态属性和静态方法。

3.1 静态属性

  /** The maximum number of bytes to pool. */
// TODO: Is 64 KiB a good maximum size? Do we ever have that many idle segments?
static final long MAX_SIZE = 64 * 1024; // 64 KiB.

/** Singly-linked list of segments. */
static @Nullable Segment next;

/** Total bytes in this pool. */
static long byteCount;
  • MAX_SIZE:表示这个实例池拥有的最大字节数。
  • next:通过next 形成一个单向链表,存储实例池中所有的 Segment 对象实例。
  • byteCount:当前这个实例池拥有的字节数。

3.2 take 方法

  static Segment take() {
synchronized (SegmentPool.class) {
if (next != null) {
Segment result = next;
next = result.next;
result.next = null;
byteCount -= Segment.SIZE;
return result;
}
}
return new Segment(); // Pool is empty. Don't zero-fill while holding a lock.
}

从实例池中获取 Segment 对象实例,如果 next == null,表示实例池为空,那么就创建一个新的Segment 对象实例返回。

3.3 recycle 方法

  static void recycle(Segment segment) {
if (segment.next != null || segment.prev != null) throw new IllegalArgumentException();
if (segment.shared) return; // This segment cannot be recycled.
synchronized (SegmentPool.class) {
if (byteCount + Segment.SIZE > MAX_SIZE) return; // Pool is full.
byteCount += Segment.SIZE;
segment.next = next;
segment.pos = segment.limit = 0;
next = segment;
}
}

回收Segment 对象实例。
方法流程

  • 如果 segmentnext或者 prev 不为 null,说明还有其他segment 对象引用着它,不能回收,直接抛出异常。
  • 如果 segment 是共享片段,它不能回收的,因为回收之后,被别的地方使用了,就有可能修改字节数组 data 的数据,就会影响共享数据的。
  • 如果实例池已经满了,也不能回收实例了。
  • 上面判断都不满足,那么就回收这个Segment 对象实例。

四. Buffer

缓存区Buffer,你可以把它想象成一个无限的字节数组(Segment 的链表实现)。
那么对缓存区Buffer就两种:

  • 向缓存区Buffer中存入字节数据。
  • 从缓存区Buffer读取之前存入的字节数据。

那么有人要问了,那我干嘛要用Buffer,直接用字节数组byte[] 不就可以了么?

而缓存区Buffer 就解决了上面问题,通过Segment链表来动态调整存储数据大小,通过SegmentPool 来重复使用片段Segment

并且缓存区Buffer提供了很多方法,将各个来源的字节数据存入到缓存区中,或者将缓存区中存储的字节数据写入到其他地方。

4.1 与IO流的交互

它既可以从输入流中读取数据,存到它字节数组中;也可以将存储的数据写入到输出流中。

4.1.1 输入流中读取数据

  /** Read and exhaust bytes from {@code in} to this. */
public final Buffer readFrom(InputStream in) throws IOException {
readFrom(in, Long.MAX_VALUE, true);
return this;
}

/** Read {@code byteCount} bytes from {@code in} to this. */
public final Buffer readFrom(InputStream in, long byteCount) throws IOException {
if (byteCount < 0) throw new IllegalArgumentException("byteCount < 0: " + byteCount);
readFrom(in, byteCount, false);
return this;
}

private void readFrom(InputStream in, long byteCount, boolean forever) throws IOException {
if (in == null) throw new IllegalArgumentException("in == null");
while (byteCount > 0 || forever) {
Segment tail = writableSegment(1);
int maxToCopy = (int) Math.min(byteCount, Segment.SIZE - tail.limit);
int bytesRead = in.read(tail.data, tail.limit, maxToCopy);
if (bytesRead == -1) {
if (tail.pos == tail.limit) {
// We allocated a tail segment, but didn't end up needing it. Recycle!
head = tail.pop();
SegmentPool.recycle(tail);
}
if (forever) return;
throw new EOFException();
}
tail.limit += bytesRead;
size += bytesRead;
byteCount -= bytesRead;
}
}

主要分析最后一个方法,它有三个参数:

  • InputStream in:输入流,从输入流中读取字节数据存入缓存区Buffer 中。
  • long byteCount:需要读取的字节个数。
  • boolean forever:是否将整个输入流中的数据都读取。如果它为'true',那么不用管 byteCount 值,直接将输入流中的数据都读完。

方法流程分析

  • 通过 while 循环,从输入流中读取想要个数的字节数据。
  • writableSegment(1) 方法,返回当前缓存区BufferSegment 链表尾的片段tail,向链表尾追加从输入流中读取的数据。
  • maxToCopy 是当前链表尾tail 剩余可写入数据大小(Segment.SIZE - tail.limit)和想要读取字节个数byteCount 中的较小值。
  • 通过 in.read(tail.data, tail.limit, maxToCopy) 方法,从输入流中读取数据,并存入链表尾片段tail 的字节数组中,返回实际读取的字节数据的个数 bytesRead
  • 如果bytesRead == -1 ,说明输入流in 中已经没有可读取的字节数据了,也就是说读取到输入流的结尾了(EOF)。
    • 需要判断 tail.pos == tail.limit, 回收链表尾片段tail
    • 如果 forever == true,那么就直接返回,否则就抛出读取到结尾的异常EOFException
  • 如果 bytesRead 不等于-1,说明有bytesRead个字节数据存入到链表尾片段tail的字节数组data 中了:
    • 那么将片段taillimit的值增加bytesRead
    • 将当前缓存区Buffer 的大小值size增加 bytesRead
    • byteCount的值减去bytesRead,因为已经读取了bytesRead个字节数据。

后来我将这个问题反馈给了官方,官方给了我下面的问答:


4.1.2 向输出流中写入数据

  /** Write the contents of this to {@code out}. */
public final Buffer writeTo(OutputStream out) throws IOException {
return writeTo(out, size);
}

/** Write {@code byteCount} bytes from this to {@code out}. */
public final Buffer writeTo(OutputStream out, long byteCount) throws IOException {
if (out == null) throw new IllegalArgumentException("out == null");
checkOffsetAndCount(size, 0, byteCount);

Segment s = head;
while (byteCount > 0) {
int toCopy = (int) Math.min(byteCount, s.limit - s.pos);
out.write(s.data, s.pos, toCopy);

s.pos += toCopy;
size -= toCopy;
byteCount -= toCopy;

if (s.pos == s.limit) {
Segment toRecycle = s;
head = s = toRecycle.pop();
SegmentPool.recycle(toRecycle);
}
}

return this;
}

将缓存区Buffer 中的byteCount 个字节数据写入输出流out 中。
方法流程

  • 先检查当前缓存区Buffer 中是否有足够数据,即 size 的大小。
  • 从链表头片段head 开始读取,通过byteCount > 0 来判断是否已经向输入流中写入足够数据。
  • toCopy 这次循环写入字节数据大小。
  • out.write(s.data, s.pos, toCopy) 向输出流out 写入toCopy 个字节数据。
  • 写入数据成功后,那么就需要改变:
    • 当前片段spos 值,增加toCopy,表示这些字节数据已经被读取。
    • 当前缓存区size值要减去toCopy,因为缓存区的toCopy 个字节数据已经写入到输出流中。
    • byteCount要减去toCopy,因为有toCopy个数据写入到输出流中了。
  • 判断 s.pos == s.limit, 表示当前片段s 中的有效数据是否都被读取完了,那么就要回收当前片段s

4.1.3 copy数据到输出流

 /** Copy the contents of this to {@code out}. */
public final Buffer copyTo(OutputStream out) throws IOException {
return copyTo(out, 0, size);
}

/**
* Copy {@code byteCount} bytes from this, starting at {@code offset}, to
* {@code out}.
*/

public final Buffer copyTo(OutputStream out, long offset, long byteCount) throws IOException {
if (out == null) throw new IllegalArgumentException("out == null");
checkOffsetAndCount(size, offset, byteCount);
if (byteCount == 0) return this;

// Skip segments that we aren't copying from.
Segment s = head;
for (; offset >= (s.limit - s.pos); s = s.next) {
offset -= (s.limit - s.pos);
}

// Copy from one segment at a time.
for (; byteCount > 0; s = s.next) {
int pos = (int) (s.pos + offset);
int toCopy = (int) Math.min(s.limit - pos, byteCount);
out.write(s.data, pos, toCopy);
byteCount -= toCopy;
offset = 0;
}
return this;
}

将缓存区Bufferoffset 位置开始,复制 byteCount 个字节数据到输出流中。
方法流程

  • 先通过checkOffsetAndCount 方法判断,复制的数据是否超过边界或者超过缓存区储存数据的大小。
  • 通过for 循环找到, offset 在链表中的片段s
  • 再使用一个 for 循环,将缓存区Bufferoffset 位置的byteCount 个字节数据到输出流out中。

4.2 与其他缓存区Buffer 的交互

缓存区也可以从其他缓存区中读取字节数据,存储到当前缓存区中;
也可以将当前缓存中存储的字节数据写入到其他缓存区中。

还记得SourceSink 类么?

4.2.1 向当前缓存区中写入数据

  @Override public void write(Buffer source, long byteCount) {
if (source == null) throw new IllegalArgumentException("source == null");
if (source == this) throw new IllegalArgumentException("source == this");
checkOffsetAndCount(source.size, 0, byteCount);

while (byteCount > 0) {
// Is a prefix of the source's head segment all that we need to move?
if (byteCount < (source.head.limit - source.head.pos)) {
Segment tail = head != null ? head.prev : null;
if (tail != null && tail.owner
&& (byteCount + tail.limit - (tail.shared ? 0 : tail.pos) <= Segment.SIZE)) {
// Our existing segments are sufficient. Move bytes from source's head to our tail.
source.head.writeTo(tail, (int) byteCount);
source.size -= byteCount;
size += byteCount;
return;
} else {
// We're going to need another segment. Split the source's head
// segment in two, then move the first of those two to this buffer.
source.head = source.head.split((int) byteCount);
}
}

// Remove the source's head segment and append it to our tail.
Segment segmentToMove = source.head;
long movedByteCount = segmentToMove.limit - segmentToMove.pos;
source.head = segmentToMove.pop();
if (head == null) {
head = segmentToMove;
head.next = head.prev = head;
} else {
Segment tail = head.prev;
tail = tail.push(segmentToMove);
tail.compact();
}
source.size -= movedByteCount;
size += movedByteCount;
byteCount -= movedByteCount;
}
}

从其他缓存区source 中读取byteCount个字节数据写入到当前缓存区中。

方法流程

  • 先检查需要写入的字节数据大小byteCount 是否超过了来源缓存区source 的大小。
  • 通过while (byteCount > 0) 循环,保证一定写入byteCount个字节数据。
  • 如果写入字节数据个数byteCount 小于来源缓存区source 的链表头片段中有效数据大小(source.head.limit - source.head.pos)。
    • 获取当前缓存区的链表尾片段tail
    • 如果链表尾片段tail不为null,可以追加数据(tail.owner == true),并且有足够的位置写入 byteCount 个字节数据。
      • 调用片段SegmentwriteTo 方法,将来源缓存区source 的链表头片段 headbyteCount 个字节数据写入当前缓存区的链表尾片段tail中。
      • 改变来源缓存区source 和当前缓存区的size 值,然后直接方法返回。
    • 如果链表尾片段tail不满足上诉条件,那么使用片段Segmentsplit 方法,将来源缓存区source 的链表头片段 head 根据 byteCount 值分裂成两个片段。
  • 如果写入字节数据个数byteCount 大于或者等于来源缓存区source 的链表头片段中有效数据大小。
    • 从来源缓存区source 的片段链表中移除头片段head
    • 如果当前缓存区headnull,那么直接将移除头片段head赋值给它。
    • 如果当前缓存区head 不为null,得到链表尾片段tail,调用片段tailpush 方法,将移除头片段head直接插入链表尾,最后调用片段tailcompact 方法,来合并一下小片段。
    • 最后改变来源缓存区source 和当前缓存区的size 值,以及更新需要写入字节数据个数byteCount 的值。

这个方法已经分析完毕了,你会发现两个缓存区数据迁移的时候,大部分情况下,都是直接片段迁移,而不需要复制字节数据,所以非常快速高效。

4.2.2 从当前缓存区读取数据

  @Override public long read(Buffer sink, long byteCount) {
if (sink == null) throw new IllegalArgumentException("sink == null");
if (byteCount < 0) throw new IllegalArgumentException("byteCount < 0: " + byteCount);
if (size == 0) return -1L;
if (byteCount > size) byteCount = size;
sink.write(this, byteCount);
return byteCount;
}

从当前缓存区中读取 byteCount个字节数据,写入到缓存区sink 中。
你会发现方法中直接调用了 sink.write(this, byteCount) 来实现功能。

4.2.3 将当前缓存区内复制到其他缓存区

  /** Copy {@code byteCount} bytes from this, starting at {@code offset}, to {@code out}. */
public final Buffer copyTo(Buffer out, long offset, long byteCount) {
if (out == null) throw new IllegalArgumentException("out == null");
checkOffsetAndCount(size, offset, byteCount);
if (byteCount == 0) return this;

out.size += byteCount;

// Skip segments that we aren't copying from.
Segment s = head;
for (; offset >= (s.limit - s.pos); s = s.next) {
offset -= (s.limit - s.pos);
}

// Copy one segment at a time.
for (; byteCount > 0; s = s.next) {
Segment copy = s.sharedCopy();
copy.pos += offset;
copy.limit = Math.min(copy.pos + (int) byteCount, copy.limit);
if (out.head == null) {
out.head = copy.next = copy.prev = copy;
} else {
out.head.prev.push(copy);
}
byteCount -= copy.limit - copy.pos;
offset = 0;
}

return this;
}

将当前缓存区从offset位置开始的byteCount 个字节数据复制到缓存区out 中。
方法大体流程

  • 先通过 for 循环,找到 offset 位置属于当前缓存区中那个片段s
  • 再通过 for 循环,将当前缓存区从 offset 位置开始的byteCount 个字节数据复制到缓存区out 中。

4.3 与字节数组的交互

4.3.1 将字节数组中的数据写入缓存区

  @Override public Buffer write(byte[] source) {
if (source == null) throw new IllegalArgumentException("source == null");
return write(source, 0, source.length);
}

@Override public Buffer write(byte[] source, int offset, int byteCount) {
if (source == null) throw new IllegalArgumentException("source == null");
checkOffsetAndCount(source.length, offset, byteCount);

int limit = offset + byteCount;
while (offset < limit) {
Segment tail = writableSegment(1);

int toCopy = Math.min(limit - offset, Segment.SIZE - tail.limit);
System.arraycopy(source, offset, tail.data, tail.limit, toCopy);

offset += toCopy;
tail.limit += toCopy;
}

size += byteCount;
return this;
}

将字节数组source 的一部分写入到当前缓存区中。
仔细分析方法:

  • 通过 while (offset < limit) 循环,保证想要写入的字节数byteCount 都能写完。
  • 通过writableSegment(1) 方法得到当前缓存区链表尾片段tail
  • 使用 System.arraycopy 方法,进行底层字节数组的复制。

4.3.2 将缓存区字节数据读取到字节数组中

  @Override public int read(byte[] sink) {
return read(sink, 0, sink.length);
}
@Override public int read(byte[] sink, int offset, int byteCount) {
checkOffsetAndCount(sink.length, offset, byteCount);

Segment s = head;
if (s == null) return -1;
int toCopy = Math.min(byteCount, s.limit - s.pos);
System.arraycopy(s.data, s.pos, sink, offset, toCopy);

s.pos += toCopy;
size -= toCopy;

if (s.pos == s.limit) {
head = s.pop();
SegmentPool.recycle(s);
}
return toCopy;
}

将当前缓存区一定数量字节数据写入到字节数组sink 中。
方法分析:

  • 获取当前缓存区的链表头head,如果为null,说明链表中没有数据,直接返回 -1 ,表示没有读取到任何数据。
  • 如果head不为null,通过 System.arraycopy 方法,将链表头head中数据复制到字节数组sink 中。
  • 如果当前链表头片段head 数据被读取完了(s.pos == s.limit),那么就将这个片段回收。

4.4 小结

Buffer 类中还有很多方法,大家自己分析就可以了。
你只要记住,Buffer 就是一个内存中存储字节数据的类,它可以动态地扩展存储空间的大小,并且有高效的内存回收和复用。

五. BufferedSource 和 BufferedSink

我们知道 Source 代表输入流,Sink 代表输出流,它们分别有:

  • Sourceread(Buffer sink, long byteCount) 将输入流Source 中的数据读取到缓存区sink中。
  • Sinkwrite(Buffer source, long byteCount) 从缓存区source 读取数据写入到输出流中。

但是你想用它们的时候, 还需要自己创建一个缓存区Buffer,才能使用。而且它们提供的方法也的确太有限了,为此,Okio 提供了 BufferedSourceBufferedSink 两个类,让我们方便使用输入流和输出流。

5.1 BufferedSource

对于输入流,可能我们有下面要求:

  • 从输入流中直接读取一个 Int Short Long ... 数据。
  • 从输入流中读取字符串
  • 从输入流中直接读取一行数据
  • 查找某个字节在输入流中的位置
  • 从输入流中读取数据存入字节数组中
  • 从输入流中读取数据,并返回获取数据组成的字节数组

BufferedSource 中还有很多其他的方法,大家自己看一下。

5.2 BufferedSink

对于输出流,可能我们有下面要求:

  • 向输出流中直接写入一个 Int Short Long ... 数据
  • 向输出流中写入字节数组
  • 向输出流中写入字符串
  • 向输出流中写入 ByteString

BufferedSink 中还有很多其他的方法,大家自己看一下。

六. RealBufferedSource 和 RealBufferedSink

它们是BufferedSourceBufferedSink的实现类。
SourceSink 进行缓存区包装,什么意思呢?

6.1 RealBufferedSource

6.1.1 成员属性

  public final Buffer buffer = new Buffer();
public final Source source;
boolean closed;
  • buffer:拥有的缓存区,即每一个 RealBufferedSource 都会自动生成一个缓存区buffer对象。
  • source:包裹的输入流对象source
  • closed:表示当前流RealBufferedSource是否关闭。

6.1.2 构造方法

  RealBufferedSource(Source source) {
if (source == null) throw new NullPointerException("source == null");
this.source = source;
}

只有一个构造函数,必须包裹一个输入流 source,因为RealBufferedSource 就是辅助输入流 source存在的。

6.1.3 read(Buffer sink, long byteCount)

  @Override public long read(Buffer sink, long byteCount) throws IOException {
if (sink == null) throw new IllegalArgumentException("sink == null");
if (byteCount < 0) throw new IllegalArgumentException("byteCount < 0: " + byteCount);
if (closed) throw new IllegalStateException("closed");

if (buffer.size == 0) {
long read = source.read(buffer, Segment.SIZE);
if (read == -1) return -1;
}

long toRead = Math.min(byteCount, buffer.size);
return buffer.read(sink, toRead);
}

这个是 Source 类的方法,表示从输入流中读取一定数量的字节数据存入缓存区sink中。
方法分析

  • 通过 source.read(buffer, Segment.SIZE) 方法,从包裹输入流 source 中读取 Segment.SIZE 数据,存入当前 RealBufferedSource 拥有的缓存区buffer 中。
  • 通过 buffer.read(sink, toRead) 将当前 RealBufferedSource 拥有的缓存区buffer 中数据写入 另一个缓存区sink 中。

6.1.4 require(long byteCount) 和 request(long byteCount)

  @Override public void require(long byteCount) throws IOException {
if (!request(byteCount)) throw new EOFException();
}

@Override public boolean request(long byteCount) throws IOException {
if (byteCount < 0) throw new IllegalArgumentException("byteCount < 0: " + byteCount);
if (closed) throw new IllegalStateException("closed");
while (buffer.size < byteCount) {
if (source.read(buffer, Segment.SIZE) == -1) return false;
}
return true;
}

这两个方法作用就是当前缓存区buffer 的数据不够 byteCount 的大小了,要从包裹输入流 source 读取一定的数据。

6.1.5 read 字节数组的方法

  @Override public int read(byte[] sink) throws IOException {
return read(sink, 0, sink.length);
}

@Override public int read(byte[] sink, int offset, int byteCount) throws IOException {
checkOffsetAndCount(sink.length, offset, byteCount);

if (buffer.size == 0) {
long read = source.read(buffer, Segment.SIZE);
if (read == -1) return -1;
}

int toRead = (int) Math.min(byteCount, buffer.size);
return buffer.read(sink, offset, toRead);
}

你会发现和 read(Buffer sink, long byteCount) 方法相似,从包裹输入流 source 读取数据存入当前缓存区buffer 中,然后再通过缓存区的read 方法,将数据写入到字节数组 sink 中。

RealBufferedSource 剩下的方法,请大家自行分析。

6.2 RealBufferedSink

6.2.1 成员属性

  public final Buffer buffer = new Buffer();
public final Sink sink;
boolean closed;
  • buffer:拥有的缓存区,即每一个RealBufferedSink都会自动生成一个缓存区buffer对象。
    *sink:包裹的输出流对象sink
    *closed:表示当前流RealBufferedSink是否关闭。

6.2.2 构造方法

  RealBufferedSink(Sink sink) {
if (sink == null) throw new NullPointerException("sink == null");
this.sink = sink;
}

只有一个构造函数,必须包裹一个输出流sink,因为RealBufferedSink就是辅助输出流sink存在的。

6.2.3 write(Buffer source, long byteCount)

  @Override public void write(Buffer source, long byteCount)
throws IOException {
if (closed) throw new IllegalStateException("closed");
buffer.write(source, byteCount);
emitCompleteSegments();
}

这个是 Sink 类的方法,表示将来源缓存区source 中的byteCount 个字节数据写入到输出流中。
方法分析

  • 通过缓存区Bufferwrite(Buffer source, long byteCount) 方法,将来源缓存区source 的数据写入到当前缓存区buffer中。
  • 通过 emitCompleteSegments 方法,判断是否向包裹的输出流对象sink中写入数据。

6.2.4 emitCompleteSegments 方法

  @Override public BufferedSink emitCompleteSegments() throws IOException {
if (closed) throw new IllegalStateException("closed");
long byteCount = buffer.completeSegmentByteCount();
if (byteCount > 0) sink.write(buffer, byteCount);
return this;
}
  • buffer.completeSegmentByteCount() 返回的是当前缓存区buffer 中,不可再写入片段字节数据个数的和。
  • 如果 byteCount > 0 ,那么就将那些不可写入片段的数据都写入包裹输出流sink 中。

6.2.4 write 字节数组

  @Override public BufferedSink write(byte[] source) throws IOException {
if (closed) throw new IllegalStateException("closed");
buffer.write(source);
return emitCompleteSegments();
}

@Override public BufferedSink write(byte[] source, int offset, int byteCount) throws IOException {
if (closed) throw new IllegalStateException("closed");
buffer.write(source, offset, byteCount);
return emitCompleteSegments();
}

都是向缓存区buffer 中写入数据,最后调用 emitCompleteSegments 方法,看是否向包裹输出流sink 中。

RealBufferedSink 剩下的方法,请大家自行分析。

七. Okio

这其实是一个工具类,方便我们来获取 SourceSink

7.1 获取 Source

想要获取一个 Source 输出流对象,其实有很多个方法。

7.1.1 从文件中获取

  /** Returns a source that reads from {@code file}. */
public static Source source(File file) throws FileNotFoundException {
if (file == null) throw new IllegalArgumentException("file == null");
return source(new FileInputStream(file));
}

7.1.2 从输入流中获取

  /** Returns a source that reads from {@code in}. */
public static Source source(InputStream in) {
return source(in, new Timeout());
}

private static Source source(final InputStream in, final Timeout timeout) {
if (in == null) throw new IllegalArgumentException("in == null");
if (timeout == null) throw new IllegalArgumentException("timeout == null");

return new Source() {
@Override public long read(Buffer sink, long byteCount) throws IOException {
if (byteCount < 0) throw new IllegalArgumentException("byteCount < 0: " + byteCount);
if (byteCount == 0) return 0;
try {
timeout.throwIfReached();
Segment tail = sink.writableSegment(1);
int maxToCopy = (int) Math.min(byteCount, Segment.SIZE - tail.limit);
int bytesRead = in.read(tail.data, tail.limit, maxToCopy);
if (bytesRead == -1) {
if (tail.pos == tail.limit) {
// We allocated a tail segment, but didn't end up needing it. Recycle!
sink.head = tail.pop();
SegmentPool.recycle(tail);
}
return -1;
}
tail.limit += bytesRead;
sink.size += bytesRead;
return bytesRead;
} catch (AssertionError e) {
if (isAndroidGetsocknameError(e)) throw new IOException(e);
throw e;
}
}

@Override public void close() throws IOException {
in.close();
}

@Override public Timeout timeout() {
return timeout;
}

@Override public String toString() {
return "source(" + in + ")";
}
};
}

可以看到创建了Source 一个内部类,通过的输入流的 in.read(tail.data, tail.limit, maxToCopy) 方法获取数据,写入到缓存区sink 中。

7.1.3 从路径中获取

  public static Source source(Path path, OpenOption... options) throws IOException {
if (path == null) throw new IllegalArgumentException("path == null");
return source(Files.newInputStream(path, options));
}

7.1.4 从 socket 中获取

  public static Source source(Socket socket) throws IOException {
if (socket == null) throw new IllegalArgumentException("socket == null");
if (socket.getInputStream() == null) throw new IOException("socket's input stream == null");
AsyncTimeout timeout = timeout(socket);
Source source = source(socket.getInputStream(), timeout);
return timeout.source(source);
}

7.2 获取 Sink

7.2.1 从文件中获取

  /** Returns a sink that writes to {@code file}. */
public static Sink sink(File file) throws FileNotFoundException {
if (file == null) throw new IllegalArgumentException("file == null");
return sink(new FileOutputStream(file));
}

7.2.2 从输出流中获取

  /** Returns a sink that writes to {@code out}. */
public static Sink sink(OutputStream out) {
return sink(out, new Timeout());
}

private static Sink sink(final OutputStream out, final Timeout timeout) {
if (out == null) throw new IllegalArgumentException("out == null");
if (timeout == null) throw new IllegalArgumentException("timeout == null");

return new Sink() {
@Override public void write(Buffer source, long byteCount) throws IOException {
checkOffsetAndCount(source.size, 0, byteCount);
while (byteCount > 0) {
timeout.throwIfReached();
Segment head = source.head;
int toCopy = (int) Math.min(byteCount, head.limit - head.pos);
out.write(head.data, head.pos, toCopy);

head.pos += toCopy;
byteCount -= toCopy;
source.size -= toCopy;

if (head.pos == head.limit) {
source.head = head.pop();
SegmentPool.recycle(head);
}
}
}

@Override public void flush() throws IOException {
out.flush();
}

@Override public void close() throws IOException {
out.close();
}

@Override public Timeout timeout() {
return timeout;
}

@Override public String toString() {
return "sink(" + out + ")";
}
};
}

可以看到创建了Sink一个内部类,从来源缓存区source 中获取字节数据,通过输出流的out.write(head.data, head.pos, toCopy) 写入。

7.2.3 从路径中获取

  /** Returns a sink that writes to {@code path}. */
@IgnoreJRERequirement // Should only be invoked on Java 7+.
public static Sink sink(Path path, OpenOption... options) throws IOException {
if (path == null) throw new IllegalArgumentException("path == null");
return sink(Files.newOutputStream(path, options));
}

7.2.4 从 socket 中获取

  public static Sink sink(Socket socket) throws IOException {
if (socket == null) throw new IllegalArgumentException("socket == null");
if (socket.getOutputStream() == null) throw new IOException("socket's output stream == null");
AsyncTimeout timeout = timeout(socket);
Sink sink = sink(socket.getOutputStream(), timeout);
return timeout.sink(sink);
}

7.3 获取缓存流

  public static BufferedSource buffer(Source source) {
return new RealBufferedSource(source);
}

public static BufferedSource buffer(Source source) {
return new RealBufferedSource(source);
}
举报

相关推荐

0 条评论