guava异步增强——ListenableFuture
- ListenableFuture继承了Future,额外新增了一个方法,listener是任务结束后的回调方法,executor是执行回调方法的执行器(通常是线程池)。guava中对future的增强就是在addListener这个方法上进行了各种各样的封装,所以addListener是核心方法
void addListener(Runnable listener, Executor executor);
- jdk原生FutureTask类是对Future接口的实现,guava中ListenableFutureTask继承了FutureTask并实现了ListenableFuture,guava异步回调最简单的使用:
ListenableFutureTask<String> task = ListenableFutureTask.create(new Callable<String>() {
@Override
public String call() throws Exception {
return "";
}
});
new Thread(task).start();
task.addListener(new Runnable() {
@Override
public void run() {
System.out.println("done");
}
}, MoreExecutors.directExecutor());
public static Executor directExecutor() {
return DirectExecutor.INSTANCE;
}
private enum DirectExecutor implements Executor {
INSTANCE;
@Override
public void execute(Runnable command) {
command.run();
}
@Override
public String toString() {
return "MoreExecutors.directExecutor()";
}
}
- 一般使用异步模式的时候,都会用一个线程池来提交任务,不会像上面那样简单的开一个线程去做,那样效率太低下了,所以需要说说guava对jdk原生线程池的封装。guava对原生线程池的增强都在MoreExecutor类中,guava对ExecutorService和ScheduledExecutorService的增强类似,这里只介绍ExecutorService的增强
ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(
5,
5,
0,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(100),
new CustomizableThreadFactory("demo"),
new ThreadPoolExecutor.DiscardPolicy());
ListeningExecutorService listeningExecutor = MoreExecutors.listeningDecorator(poolExecutor);
ExecutorService newPoolExecutor = MoreExecutors.getExitingExecutorService(poolExecutor);
MoreExecutors.addDelayedShutdownHook(poolExecutor, 120, TimeUnit.SECONDS);
- 有了上面的学习,就可以真正使用guava的异步回调了
ListenableFuture<String> listenableFuture = listeningExecutor.submit(new Callable<String>() {
@Override
public String call() throws Exception {
return "";
}
});
Futures.addCallback(listenableFuture, new FutureCallback<String>() {
@Override
public void onSuccess(String result) {
}
@Override
public void onFailure(Throwable t) {
}
});
public static <V> void addCallback(final ListenableFuture<V> future, final FutureCallback<? super V> callback, Executor executor) {
Preconditions.checkNotNull(callback);
Runnable callbackListener =
new Runnable() {
@Override
public void run() {
final V value;
try {
value = getDone(future);
} catch (ExecutionException e) {
callback.onFailure(e.getCause());
return;
} catch (RuntimeException e) {
callback.onFailure(e);
return;
} catch (Error e) {
callback.onFailure(e);
return;
}
callback.onSuccess(value);
}
};
future.addListener(callbackListener, executor);
}
- guava还提供了多个异步任务的链式执行方法,如果使用addListener实现大概是这样,会一层一层不断地套下去
ListenableFutureTask<String> task1 = ListenableFutureTask.create(new Callable<String>() {
@Override
public String call() throws Exception {
return "";
}
});
new Thread(task1).start();
task1.addListener(new Runnable() {
@Override
public void run() {
ListenableFutureTask<String> task2 = ListenableFutureTask.create(new Callable<String>() {
@Override
public String call() throws Exception {
return "";
}
});
task2.addListener(new Runnable() {
@Override
public void run() {
...
}
}, MoreExecutors.directExecutor());
new Thread(task2).start();
}
}, MoreExecutors.directExecutor());
ListenableFuture<String> task2 = Futures.transform(task1, new Function<String, String>() {
@Override
public String apply(String input) {
return "";
}
});
ListenableFuture<String> task3 = Futures.transform(task2, new Function<String, String>() {
@Override
public String apply(String input) {
return "";
}
});
Futures.addCallback(task3, new FutureCallback<String>() {
@Override
public void onSuccess(String result) {
}
@Override
public void onFailure(Throwable t) {
}
});
- Futures.transform()和Futures.transformAsync()的区别在于一个参数为Function,一个是AsyncFuntion,AsyncFuntion的apply方法返回值类型也是ListenableFuture,也就是回调方法也是异步的
ListenableFutureTask<String> task1 = ListenableFutureTask.create(new Callable<String>() {
@Override
public String call() throws Exception {
TimeUnit.SECONDS.sleep(5);
System.out.println("task1 over" + new Date());
return "";
}
});
new Thread(task1).start();
ListenableFuture<String> transform = Futures.transform(task1, new Function<String, String>() {
@Override
public String apply(String input) {
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("trans over" + new Date());
System.out.println("trans over" + Thread.currentThread());
return "";
}
});
while (true) {
TimeUnit.MILLISECONDS.sleep(200);
System.out.println(new Date().toString() + Thread.currentThread());
}