目录
简介
Asynch HttpClient是基于HttpCore NIO和HttpClient组件的HTTP / 1.1兼容HTTP代理实现。 它是Apache HttpClient的补充模块,适用于特殊情况。在特殊情况下,就原始数据吞吐量而言,处理大量并发连接的能力比性能更为重要。
reactor模型
HttpAsyncClient 用的是reactor模型(NIO),底层是poll()。
什么是reactor?
在Reactor中,被拆分的小线程或者子过程对应的是handler,每一种handler会出处理一种event。
这里会有一个全局的管理者selector,我们需要在channel注册上对应事件,那么这个selector就会不断在channel上检测是否有该类型的事件发生。如果没有,那么主线程就会被阻塞,否则就会调用相应的事件处理函数即handler来处理。
重要参数
PoolingNHttpClientConnectionManager类下
1)defaultMaxPerRoute = 2
每一个 local IP => remoteIP : port 为一个route,在向http服务器单一(ip,port)对发送请求时,这个参数控制了可以建立的tcp连接上限。
2)maxTotal 20
连接池最大并发数。这个设置与应用的并发量有关。比如应用的1s的请求量1000,每个请求需要10ms,则需要的最大并发数为 1000/(1s/10ms)=10,理论上设置的值最好比这个稍大一点点。
总之,同一route上的http请求数量受限于 maxPerRoute, 与本地打开的、向同一对端(ip:port)的端口号数量相同。每一请求使用 IOSessionImpl 保存对话上下文,并附到 SelectionKey 上。
3)连接池的相关参数
org/apache/http/nio/pool/AbstractNIOConnPool.java
public abstract class AbstractNIOConnPool<T, C, E extends PoolEntry<T, C>>
implements ConnPool<T, E>, ConnPoolControl<T> {
// 一个可复用的ioreactor, 负责生成SessionRequest并唤醒selector去做连接到目标网站
private final ConnectingIOReactor ioreactor;
// 用来构造连接池的entry的工厂
private final NIOConnFactory<T, C> connFactory;
// 验证并生成目标连接socketAddress的类
private final SocketAddressResolver<T> addressResolver;
// 一个可复用的callBack类, 里面提供了一个调用SessionRequest的complete的方法
private final SessionRequestCallback sessionRequestCallback;
// 用域名区分的连接池
private final Map<T, RouteSpecificPool<T, C, E>> routeToPool;
// 没有成功拿到连接的请求列表
private final LinkedList<LeaseRequest<T, C, E>> leasingRequests;
// 已经拿到连接权利, 但是还没连接成功的连接集合
private final Set<SessionRequest> pending;
// 已经连接成功, 并被租借出去的连接集合
private final Set<E> leased;
// 当前连接池可用的连接集合
private final LinkedList<E> available;
// 已经连接完成, 但是不可用的连接集合, 例如因为异常连接失败等待, 他们会在队列中等待被调用回调方法做后续处理
private final ConcurrentLinkedQueue<LeaseRequest<T, C, E>> completedRequests;
// 每个route的最大连接数
private final Map<T, Integer> maxPerRoute;
// 锁对象
private final Lock lock;
// 是否关闭
private final AtomicBoolean isShutDown;
// 每个route最大连接数默认值
private volatile int defaultMaxPerRoute;
// 整个连接池最大连接数
private volatile int maxTotal;
}org/apache/http/nio/pool/AbstractNIOConnPool.java
public abstract class AbstractNIOConnPool<T, C, E extends PoolEntry<T, C>>
implements ConnPool<T, E>, ConnPoolControl<T> {
// 一个可复用的ioreactor, 负责生成SessionRequest并唤醒selector去做连接到目标网站
private final ConnectingIOReactor ioreactor;
// 用来构造连接池的entry的工厂
private final NIOConnFactory<T, C> connFactory;
// 验证并生成目标连接socketAddress的类
private final SocketAddressResolver<T> addressResolver;
// 一个可复用的callBack类, 里面提供了一个调用SessionRequest的complete的方法
private final SessionRequestCallback sessionRequestCallback;
// 用域名区分的连接池
private final Map<T, RouteSpecificPool<T, C, E>> routeToPool;
// 没有成功拿到连接的请求列表
private final LinkedList<LeaseRequest<T, C, E>> leasingRequests;
// 已经拿到连接权利, 但是还没连接成功的连接集合
private final Set<SessionRequest> pending;
// 已经连接成功, 并被租借出去的连接集合
private final Set<E> leased;
// 当前连接池可用的连接集合
private final LinkedList<E> available;
// 已经连接完成, 但是不可用的连接集合, 例如因为异常连接失败等待, 他们会在队列中等待被调用回调方法做后续处理
private final ConcurrentLinkedQueue<LeaseRequest<T, C, E>> completedRequests;
// 每个route的最大连接数
private final Map<T, Integer> maxPerRoute;
// 锁对象
private final Lock lock;
// 是否关闭
private final AtomicBoolean isShutDown;
// 每个route最大连接数默认值
private volatile int defaultMaxPerRoute;
// 整个连接池最大连接数
private volatile int maxTotal;
}
流程设计
HttpAsyncClient有一个AbstractMultiworkerIOReactor和AbstractIOReactor, 前者和后者类似于netty的bossGroup和workerGroup。
AbstractMultiworkerIOReactor负责channel的连接, AbstractIOReactor负责channel的读写。
1) 发起请求
a. 根据请求route查看连接池, 如果连接池不为空, 直接返回跟池中connection绑定的future, 并把该conn放入leased列表
b. 如果因为某些原因导致当前请求无法取得连接, 但是没有发生致命错误的, 请求将被放入一个 leasing 列表, 这个列表会在后续动作中被取出来做连接重试
c. 如果实在连接过程中出现了移除等不可恢复的错误, 则将request标记为completed, 退出方法后调用fireCallBack, 进行回调清理, 这次请求就算是失败结束了
d. 如果是因为连接池没有可用连接, 但是可以新建连接的情况, 则会将request 加入pending列表, 并调用 selector的wakeup()方法, selector在wakeup以后会使用AbstractMultiworkerIOReactor(bossGroup)来进行连接操作, 并注册到selector中, 后续的connectable事件监听和channel连接成功注册也是由他完成的
2) AbstractIOReactor监听读写事件
3)通过decoder检测response已经完成, 最后将连接release到连接池中, 此时将连接从leased列表除去, 并加入到available中。
相关类简介
类名 | 概要 |
HttpAsyncRequestExecutor | |
InternalIODispatch | |
PoolingNHttpClientConnectionManager | |
DefaultConnectingIOReactor | 建立连接用的boss reactor,一个client只有一个 |
BaseIOReactor | 处理读写的worker reactor,一个client可以有多个 |
CPool | TCP连接池,不是线程池 |
ManagedNHttpClientConnectionImpl | 一条TCP连接 |
IOSessionImpl | 一对 HTTP Request/Response 所使用的会话上下文 attributes 中持有 ManagedNHttpClientConnectionImpl 引用等 |
class依赖关系
时序图
Main Thread
Reactor Thread图(负责 connect)
Worker Thread图(负责 read write)
源码浅谈
InternalHttpAsyncClient
org/apache/http/nio/client/HttpAsyncClient.java
/**
此接口只代表HTTP请求执行的最基本的契约。它对请求执行过程不施
加任何限制或特定细节,并将状态管理、身份验证和重定向处理的细节
留给子类实现。
*/
public interface HttpAsyncClient {
/**
使用给定上下文启动异步HTTP请求execution。
*/
<T> Future<T> execute(
HttpAsyncRequestProducer requestProducer,
HttpAsyncResponseConsumer<T> responseConsumer,
HttpContext context,
FutureCallback<T> callback);
……
}
poll函数
DefaultConnectingIOReactor: 负责连接 remote hosts。其类相关Selector的poll由WindowsSelectorImpl类的poll0() 来实现。
Java\jdk1.8.0_261\jre\lib\rt.jar!\sun\nio\ch\WindowsSelectorImpl.class
final class WindowsSelectorImpl extends SelectorImpl {
private final int INIT_CAP = 8;
private static final int MAX_SELECTABLE_FDS = 1024;
.......
private native int poll0(long var1, int var3, int[] var4, int[] var5, int[] var6, long var7);
......
}
poll函数用于监测多个等待事件。
若事件未发生,进程睡眠,放弃CPU控制权;
若监测的任何一个事件发生,poll将唤醒睡眠的进程,并判断是什么等待事件发生,执行相应的操作。
缺点: 依赖fd文件。包含大量文件描述符的数组被整体复制于用户态和内核的地址空间之间,而不论这些文件描述符是否就绪,它的开销随着文件描述符数量的增加而线性增大。
然而FD_SETSIZE默认为1024。linux中的是通过文件方式来管理系统的,因此系统能承载多少TCP连接和系统文件打开数目能力是相关的。在/proc/sys/fs/file-max中定义了,系统最多能够打开的文件数目。
Future类
Future<HttpResponse> future = httpAsyncClient.execute(httpPost, futureCallback);
HttpResponse response = future.get(timeout, TimeUnit.SECONDS);
使用future的get方法设置超时,会在主线程进行阻塞,而不是每个子线程的超时。HttpAsyncClient的BasicFuture主要是实现了Future接口,通过Object.wait() 和notifyAll() 分别来阻塞和唤醒线程的执行。如下:
org/apache/http/concurrent/BasicFuture.java
public class BasicFuture<T> implements Future<T>, Cancellable {
private final FutureCallback<T> callback;
private volatile boolean completed;
private volatile boolean cancelled;
private volatile T result;
private volatile Exception ex;
public BasicFuture(final FutureCallback<T> callback) {
super();
this.callback = callback;
}
@Override
public synchronized T get(final long timeout, final TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
Args.notNull(unit, "Time unit");
final long msecs = unit.toMillis(timeout);
final long startTime = (msecs <= 0) ? 0 : System.currentTimeMillis();
long waitTime = msecs;
if (this.completed) {
return getResult();
} else if (waitTime <= 0) {
throw new TimeoutException();
} else {
for (;;) {
wait(waitTime);
if (this.completed) {
return getResult();
} else {
waitTime = msecs - (System.currentTimeMillis() - startTime);
if (waitTime <= 0) {
throw new TimeoutException();
}
}
}
}
}
public boolean completed(final T result) {
synchronized(this) {
if (this.completed) {
return false;
}
this.completed = true;
this.result = result;
notifyAll();
}
if (this.callback != null) {
this.callback.completed(result);
}
return true;
}
....
}
参考
官方文档:https://hc.apache.org/httpcomponents-asyncclient-4.1.x/index.html
https://blog.csdn.net/niugang0920/article/details/105076109
https://www.jianshu.com/p/6bcfd6b2bd5c
https://www.cnblogs.com/zemliu/p/3719292.html
https://blog.csdn.net/rickey17/article/details/87914737