Executor、ExecutorService、Executors、ThreadPoolExecutor、Future、Runnable、Callable

阅读 82

2022-04-19

Executor

Executor类的实例对象是一个执行已提交的任务的对象。
该接口提供了一种将任务的提交与每个任务的运行机制分离的方法,包括线程使用细节、调度细节等。

通常将为任务显示地创建线程替换为使用Executor来执行任务。比如,当有多个线程任务时,为每个任务创建一个线程并启动new Thread(new Runnable(){}).start()的代码,可以替换为:

Executor executor = anExecutor;
executor.execute(new Runnable(){});
executor.execute(new Runnable(){});

但是,Executor并没有严格要求任务的执行必须是异步的。最简单的情况,是Executor可以在应用execute(runnable)方法的线程中立即运行提交的任务:

class DirectExecutor implements Executor{
	public void execute(Runnable r){
		r.run();
	}
}

更为典型的情况则是,Executor会在一些其他线程中运行提交的任务、而非在调用execute®方法的线程中。以下代码示例的是一个为每一个任务创建一个新的线程的执行器:

class ThreadPerTaskExecutor implements Executor{
	public void execute(Runnable r){
		new Thread(r).start();
	}
}

许多Executor的实现类会给任务的执行方式和时间做一些限制和要求。以下代码所实现的一个执行器会将提交上来的所有任务序列化地在提交给另一个Executor实例,说明这是一个复合执行器:

class SerialExecutor implements Executor{
	Executor executor;
	final Queue<Runanble> tasks = new ArrayDeque<>();
	Runnable active;

	public SerialExecutor(Executor executor){
		this.executor = executor;
	}
	public synchronized void execute(Runnable r){
		tasks.offer(r->{	// Queue.offer(E e);——添加一个元素到队尾,然后检查如果队列已满则自动扩容
			try{
				r.run();
			}finally{
				scheduleNext();
			}			
		});	
		if(active == null)		{	scheduleNext();		}					
	}

	protected synchronized void scheduleNext(){
		if((active = tasks.poll())!=null)		{	executor.execute(active);		}
	}
}

SerialExecutor的execute方法首先重新封装提交给它的任务r,重新封装出的新任务的run()方法代码逻辑为:

  1. 首先会执行原来的任务的代码r.run(),
  2. 然后自动从task队列中poll一个封装后的新任务,并调用另一个executor.execute®方法执行这个新任务

重新封装好提交来的任务后,会检查executor是否正在执行任务,如果没有,就调用scheduleNext()来启动executor执行新任务。

SerialExecutor实现了对提交来的任务的串行化执行。由于scheduleNext()方法是protected的,因此外部方法无法直接调用它来并行地启动另外一个线程。而SerialExecutor对通过execute®方法提交来的任务做了新的封装,使得只有在一个任务运行完毕后才会调用另一个executor的方法运行新的任务。

那么当多次调用SerialExecutor的execute®方法时,会出现什么情况呢?例如在同一个线程中,连续数次调用这个方法,synchronized关键字是可重入锁,execute®方法也会被多次执行,但是其执行的是封装新任务的代码,并非是多次启动另一个executor.execute®方法的代码,启动另一个executor的代码只有在active== null的时候才会执行,那么还是能确保任务的串行化执行。

另外,tasks则相当于用一个queue对任务做了一个缓存,由于防止另外的executor来不及处理提交的任务。并使用synchronized以防多个线程同时访问queue时可能出现数据错误。

同一个package内的另一个接口ExecutorService,继承了Executor接口,更广为使用。
ThreadPoolExecutor类则是一个可供应用程序开发人员扩展的Executor实现类。
Executors类则为以上这些实现类提供了遍历的工厂方法。

Memory consistency effects(内存一致性影响): Actions in a thread prior to submitting a {@code Runnable} object to an {@code Executor} happen-before its execution begins, perhaps in another thread.

public interface Executor{
	/**
	* 在未来某个时间执行给定的r。命令可能在一个新线程中执行、或者在一个线程池中执行、或者就在调用线程中直接执行
	* 取决于实现类的自行决定。
	*/
	void execute(Runnable r);
}

ExecutorService

执行组件服务:Executor的实现类,提供了管理终止的方法和为异步任务生成能跟踪运行过程的Future的方法。
ExecutorService能提供了关闭接口,对executorService对象应用了关闭方法后,executorService就会拒绝接受新任务。
当executorService终止后,没有正在运行的任务,没有等待执行的任务,新任务也不能提交了。应当对终止后的executorService应用shutdown方法,以允许回收其资源。

submit方法扩展了Executor.execute方法,它会创建并返回一个Future,应用这个Future可撤销任务或等待任务执行完成。

invokeAny & invokeAll 方法是最常用的执行批量任务的方法,会执行一个集合中的任务,并等待至少一个或者所有任务的完成。

应用程序开发者可以扩展ExecutorCompleionService类,以自定义以上方法。

类Executors提供了创建执行组件的工厂方法。

以下代码展示了一个使用线程池中的线程为网络请求提供服务的简单网络服务器。它使用已经设定好了各种配置的Executors.newFixedThreadPool工厂方法:

public class NetworkService implements Runnable{
	private final ExecutorService pool;
	private final ServerSocket serverSocket;

	public NetworkServece(int port, int poolSize){
		serverSocket = new ServerSocket(port);
		pool = Executors.newFixedThreadPool(poolSize);
	}
	

	public void run(){
		try{
			for(;;){
				pool.submit(new Handler(serverSocket.accept()));
			}
		}catch(Exception e ){
			 pool.shutdown(); 
		}		
	}	
}
class Handler implements Runnable{
	final Socket socket;
	public Handler(Socket socket){
		this.socket = socket;
	}
	public void run(){
		// read and service request on socket
	}
}

以下代码对一个ExecutorService对象实施两阶段的关闭,以确保能撤销时间较长的任务。先对executorService对象应用shutdown方法,拒绝接受新任务;再对其应用shutdowNow方法,以撤销时间过长的任务。

void shutDownAndAwaitTermination(ExecutorService pool){
	pool.shutdown();		// 停止接收新任务
	try{
		if(!pool.awaitTermination(60, TimeUnit.SECONDS){	// 给正在执行的任务和等待执行的任务60秒的时间。
			pool.shutdownNow();		// 60秒后仍有任务未完成,则直接撤销那些正在执行的或者等待执行的任务
			if(!pool.awaitTermination(60, TimeUnit.SECONDS){	// 再等60秒给任务响应撤销请求
				System.err.println("pool did not terminate");		// 仍有任务未能成功撤销
			}
		}
	}catch(InterruptedException e){
		pool.shutdownNow();		// 如果当前线程也被中断了,仍然执行关闭
		Thread.currentThread().interrupt();  // 保持当前线程的中断状态
	}finally{}
}

内存一致性效果:向ExecutorService提交一个Runnable任务或者Callable任务的动作一定发生在ExecutorService运行这个任务的动作之前,而运行这个任务的动作一定又发生在通过Future.get()方法得到这个任务的运行结果之前。

public interface ExecutorService extends Executor{
	// 启动一个有序的关闭,停止接收新任务,将所有已提交的任务执行完毕。但不会阻塞等待所有已提交的任务全部执行完。如果需要,使用awaitTermination()。
	// 对于已经关闭的ExecutorService没有任何影响。
	// throws SecurityException,如果设置了security manger?
	void shutdown();

	// 停止所有真正运行的任务,停止对等待执行的任务的处理
	// 返回等待执行的任务列表
	List<Runnable> shutdownNow();
	
	// 如果已经关闭,返回true
	boolean isShutdown();

	// 如果在应用shutdown()方法后,所有任务都已执行完毕。则返回true
	// 如果在调用这个方法前,没有先调用shutdown方法,那么永远不可能返回true。
	boolean isTerminated();

	// 阻塞当前线程,直到ExecutorService接收的所有任务都执行完毕、或者超时、或者被interrupt中断
	// 如果是所有任务都执行完毕后返回的,返回true;如果是超时返回的,返回false;如果被中断的,抛出InterruptedException异常
	boolean awaitTermination(long timeout, TimeUnit unit);

	// 提交一个有返回值的待执行任务,并返回一个代表任务的返回值的Future。调用Future.get()方法会阻塞,直到任务执行完毕,会返回任务的返回值。
	// 如果允许立即阻塞并等待返回值,可以用如下形式:exec.submit(aCallable).get();
	<T>  Future<T>	submit(Callable<T> task);
	
	// 同上
	<T> Future<T> submit(Runnable r, T result);
	
	//  同上,但Future.get()会阻塞,直到任务执行完毕,返回一个null
	Future<?> submit(Runnable r);
	
	// 阻塞直到执行完集合tasks中的所有任务,然后返回封装了每个任务的执行状态和结果的Future的List。
	// 由于方法会阻塞直到所有任务都执行完才会返回,因此对于列表中的每个Future应用isDone方法的返回值必定是true
	// tasks中的任务可能是正常运行完毕后返回,也可能是出现异常而中止。
	// 如果在方法阻塞过程中更新了集合tasks,那么返回的List的具体情况未可知
	// 返回的List中的Future的顺序与用iterator()遍历tasks的顺序完全一致
	// 如果阻塞过程中被中断,则抛出中断异常,并且所有未完成的任务将被取消
	// 如果列表中的某个元素为null,则会抛出异常
	<T> List<Future<T>> invokeAll(Collection<? extends Callable> tasks);
	
	// 同上,但如果超时返回,则执行中而未完成的任务会被撤销
	<T> List<Future<T>> invokeAll(Colleciton<? extends Callable> tasks, long timeout, TimeUnit unit);

	// 阻塞,直到tasks中的任一个任务正常执行完毕,然后返回这个任务的执行结果。必须有一个任务正常执行完毕。
	// 无论方法是正常返回还是异常终止,都会撤销所有正在执行的未完成的任务
	// 如果参数tasks为null,抛出IllegalArgumentException
	// 如果没有任务正常执行完毕,
	<T> T invokeAny(Collection<? extends Callable> tasks) throw ExecutionException;
	
}

Callable

用于创建有返回值并且可以抛出受检异常的任务。
与Runnable接口相同的是:两者都是为了创建需要运行在另外的线程中的任务而设计。但Runnable接口不能返回结果,且不能抛出受检异常。
Executors类提供了将其它形式的任务转换为Callable接口的实例的工具方法。

@FunctionalInterface
public interface Callable<V> {
    /**
     * 计算出一个结果, or throws an exception if unable to do so.
     *
     * @return 计算结果
     * @throws Exception if unable to compute a result
     */
    V call() throws Exception;
}

Executors中提供的将Runnable实例转换为Callable实例的工具:

public class Executors{

	/**
	* 通常在需要转换成Callable去执行,但是又不需要返回值时,可以用如下形式:
	* Callable<?> callable = new RunnableAdapter<Void>(aRunnable, null);
	* Void 是 void的封装类。void 是和 java八大基本类型相同的一个java 原生类型。
	*/	
	static final class RunnableAdapter<T> implements Callable<T> {
        final Runnable task;
        final T result;
        RunnableAdapter(Runnable task, T result) {
            this.task = task;
            this.result = result;
        }
        public T call() {
            task.run();
            return result;
        }
    }

}

Future

一个Future实例代表一个异步计算的执行结果。
提供了阻塞等待异步计算执行完成后获取结果(get)、撤销计算任务(cancel)、检查异步计算是否已经完成(isDone)、检查计算是否被取消(isCancelled)的接口方法。
如果计算已经执行完成,则无法被取消cancel。

public interface Future<T>{


    /**
     * 如果任务已经执行完成、或者已经被取消,则取消失败,方法返回false。
     * 如果任务正在执行中,会根据参数mayInterruptIfRunning判断是否将执行这个任务的线程的中断状态置位
     * 当此方法返回后,如果调用isDone方法,则其返回值为true。
     * 如果此方法返回true,那么后续调用isCancelled方法的返回值为true。  
     */
    boolean cancel(boolean mayInterruptIfRunning);

    /**
     * 如果任务在执行完成前被成功地cancel()了,则返回true
     */
    boolean isCancelled();

    /**
     * 无论任务是正常运行结束或者异常中止,都返回true。
     */
    boolean isDone();

    /**
     * 阻塞等待任务执行完成,然后获取任务的返回值
     * @return 计算结果
     * @throws CancellationException 如果任务被取消了
     * @throws ExecutionException 如果任务运行过程中抛出了异常
     * @throws InterruptedException 如果应用get()方法的当前线程被中断了:如果当前线程在阻塞等待任务计算完成的过程中被中断了
     */
    V get() throws InterruptedException, ExecutionException;

    /**
     * 阻塞等待任务执行完毕,并返回任务的计算结果。
     * @throws CancellationException 如果任务已经被cancelled
     * @throws ExecutionException 如果任务执行过程中抛出了异常
     * @throws InterruptedException 如果当前线程在阻塞等待的过程中被中断了
     * @throws TimeoutException 如果等待超时了
     */
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;

}

RunnableFuture

一个RunnableFuture实例是一个可以代表一个Runnable实例的执行结果的Future对象。
当Runnable的run方法被执行完毕后,Future对象就完成了,并可以访问它得到运行结果。

public interface RunnableFuture<V> extends Runnable, Future<V> {
    /**
     * run()方法中将设置当前RunnableFuture对象代表其计算结果
     */
    void run();
}

FutureTask

一个可取消的异步计算,FutureTask类提供了Future接口的基本实现,实现了启动和取消计算、查看计算是否已经完成、获取计算结果的接口方法
只有计算执行完毕后,计算结果才能被获取到。如果计算还未完成、get方法将会阻塞当前线程。
计算一旦被执行完成,计算任务不能再被重新启动或者取消、除非应用runAndReset方法。

FutureTask可以用于封装Runnable对象或者Callable对象,因为FutureTask实现了Runnable接口,因此可以提交到executor执行。

public class FutureTask<V> implements RunnableFuture<V> {
	private volatile int state;		// 任务当前的执行状态
	private static final int NEW = 0;
	private static final int COMPLETING = 1;
	private static final int NORMAL = 2;
	private static final int EXCEPTIONAL = 3;
	private static final int CANCELLED = 4;
	private static final int INTERRUPTING = 5;
	private static final int INTERRUPTED = 6;
	/**
	* state的值从NEW转移到最终状态只能经由以下三个方法:set \ cancel -> CANCELLED \ 	setException ->EXCEPTIONAL
	* state的值的可能转移过程:
	* NEW - COMPLETING - NORMAL
	* NEW - COMPLETING - EXCEPTIONAL
	* NEW - CANCELLED
	* NEW - INTERRUPTING - INTERRUPTED
	* 在completion过程中,state的值可能会经由瞬间状态:COMPLETING 或 INTERRUPTING 
	*/
	/**
	* 版本修订:之前的版本使用AQS来对阻塞在get()方法上的线程同步。但是当前版本使用CAS更新state来独立地完成线程同步,以及一个Treiber堆栈来存储阻塞在get()上的线程。
	*/
	
	/** 隐含的callable , 这个任务被执行完毕后会被置为null */
	private Callable<V>	callable;
	/** 调用get()方法后要返回的运行的结果、或者要抛出的异常 */
	private Object outcome;		// 没有将其设置为volatile的,而是用state的reads\writes来保护的
	
    /** 用于执行callable任务的线程; CASed during run() */
    private volatile Thread runner;		// 当进入run方法后,首先将当前线程CAS赋值给runner变量
    /** 一个用来存储阻塞等待在get()上的线程的treiber 栈 */
    private volatile WaitNode waiters;	// treiber栈实际是一个基于CAS原子操作的无锁并发栈
	
	   /**
     * 返回任务运行的结果、或者抛出任务运行过程中的异常
     * @param s state的值
     */
    private V report(int s) throws ExecutionException {
        Object x = outcome;
        if (s == NORMAL)
            return (V)x;
        if (s >= CANCELLED)
            throw new CancellationException();
        throw new ExecutionException((Throwable)x);
    }

   /**
     * 构造器一个新的FutureTask任务,当运行这个任务时,会执行callable的代码
     */
    public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;       // ensure visibility of callable
    }
        /**
     * 构造一个FutureTask任务,任务被执行时会运行runnable的代码,当任务被正常执行完毕后,get方法将返回给定的result参数
     * 如果没有特定的返回值,可以使用如下格式:Future<?> f = new FutureTask<Void>(runnable, null)}
     */
    public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // ensure visibility of callable
    }

    public boolean isCancelled() {
        return state >= CANCELLED;
    }

    public boolean isDone() {
        return state != NEW;
    }

    public boolean cancel(boolean mayInterruptIfRunning) {
        if (!(state == NEW &&		// 只有在NEW状态下,成功cas更新了state的状态。是继续cancel的前提,否则直接return false;
              UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
                  mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
            return false;
        try {    // in case call to interrupt throws exception
            if (mayInterruptIfRunning) {	// 如果需要中断,则将最终状态设置为INTERRUPTED。
                try {
                    Thread t = runner;
                    if (t != null)
                        t.interrupt();
                } finally { // final state
                    UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
                }
            }
        } finally {
            finishCompletion();
        }
        return true;
    }
  /**
     * @throws CancellationException 如果任务已经被成功地cancelled
     */
    public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        return report(s);
    }


    /**
     * @throws CancellationException {@inheritDoc}
     */
    public V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {... }
    /**
     * 将v赋值给代表任务的计算结果的变量outcome。如果此FutureTask已经被set过,或者已经被cancelled,则不会做任何操作。
     * 这个方法是在run方法成功执行完毕后被调用的
     */
    protected void set(V v) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = v;
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
            finishCompletion();
        }
    }
  /**
     * 将t赋值给代表任务的计算结果的对象outcome,如果此Future已经被set过,或者已经被cancelled,则不会做任何动作
     * 这个方法是在run方法异常退出时被调用的。
     */
    protected void setException(Throwable t) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = t;
            UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
            finishCompletion();
        }
    } 

    public void run() {
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);    // run方法异常退出,调用setException将异常赋值给outcome
                }
                if (ran)
                    set(result);	// run方法正常退出,调用
            }
        } finally {
            runner = null;	// 在未将state的状态更新成最终值之前,runner不可为空,以防止并发的执行run
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }
}

    /**
     * 执行计算任务,但不设置结果。然后将当前对象重置为初始状态。
     * 如果执行过程中抛出了异常就返回false。
     * 用于自身逻辑上就需要执行多次的任务。
     * @return {@code true} if successfully run and reset
     */
    protected boolean runAndReset() {
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return false;
        boolean ran = false;
        int s = state;
        try {
            Callable<V> c = callable;
            if (c != null && s == NEW) {
                try {
                    c.call(); // don't set result
                    ran = true;
                } catch (Throwable ex) {
                    setException(ex);
                }
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
        return ran && s == NEW;
    }

在FutrueTask类中,间接继承自Future接口的方法(get、cancell、isDone、isCancelled),是客户端方法,被客户端线程调用。
而间接继承自Runnable接口的方法(run),则是会被任务真正的执行线程所调用的方法,比如线程池中的某个工作线程。

那么FutureTask类是如何协调这两种线程呢?对于客户端线程,大致流程如下:

  • 首先:某客户端线程,把真正要执行的任务也即一个Runable实例r ,构造成一个FutureTask(Runnable r, V v)实例对象
  • 接着:某客户端线程,将这个FutureTask实例提交到一个线程池;或者客户端线程直接创建一个新的线程,来执行这个实现了Ruannble接口的FutureTask实例:new Thread(futureTask).start();
  • 当客户端线程将FutureTask实例提交给了另外的线程或者线程池后,新线程或者线程池会在其后的某个时间点对提交来的FutureTask实例应用其run()方法,但是具体何时执行无法预测。
  • 接着:创建并提交了任务的客户端线程,通常会在其后的逻辑中,应用FutureTask实例的客户端方法(get \ cancell \ isDone \ isCancelled) 来等待任务执行完毕、或者获取 run()的结果、或者取消任务。
  • 当客户端线程应用了FutureTask实例的get()方法futureTask.get(),get()方法就会将客户端线程加入等待队列 waiters(),并使其阻塞,也就是说客户端线程会被阻塞在futureTask对象上:LockSupport.park(this, nanos);。直到真正执行任务的线程应用r.run()并执行完毕后,会唤醒这些等待线程(LockSupport.unpark(t) );或者直到客户端线程被中断 t.interrupt() ;或者其它客户端线程对任务应用了cancel()方法
  • 当客户端线程应用了FutureTask实例的cancel方法futureTask.cancel(),客户端线程会对真正执行任务的线程runner应用runner.interrupt()方法,中断任务的执行,然后会唤醒waiters等待队列中的所有阻塞在get()方法上的客户端线程。

对于真正执行任务的线程:

  • 任务的真正执行线程,会应用futureTask的run方法。
  • 执行线程在执行futureTask的run方法时,首先会设置对象的属性runner为当前执行线程:UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())
  • 接着:调用构造FutureTask实例时的Runnable r的run方法,执行真正的任务逻辑。
  • 最后:LockSupport.unpark(t)唤醒waiters队列中的所有阻塞在get()方法上的线程。无论r.run是正常执行完毕还是异常结束。

以上内容简单表述了提交任务的客户端线程与任务的执行线程是如何协调的,其中应用了volatile + UNSAFE.compareAndSwapXXX、LockSupport.parkNanos(this, nanos)、LockSupport.park(this)、LockSupport.unpark()、t.interrupt()、Thread.yeild()、等JDK原生线程间协调的API。有如下关键点:

  • volatile Thread runner ; // 任务的真正执行线程,用于防止出现多个执行线程同时执行这个任务的情况,当多个执行线程同时进入futureTask.run()方法后,会执行if(state!=NEW || !UNSAFE.casObjct(this, runnerOffset, null, Thread.current())) return false; ,即,如果runer不为null,说明已经有线程在执行这个任务,当前执行线程应退出。
  • volatile int state; // 任务的状态,也是同UNSAFE.casInt来控制。
  • volatile WaitNode waiters; // 调用get()方法后的客户端线程t,如果任务还没被执行完毕,则客户端线程会被构造成一个WaitNode节点q,q.thread=t;,然后头插法插入waiters链表:s = waiters; UNSAFE.casObject(this, waitersOffset, q.next = s, q );并被阻塞

FutureTask实现了RunnableFuture,则其本身就是一个Runnable,可以通过Executor.execute®方法提交并执行。
这个Runnable的实现:run() { ... result = callable.call(); ... }
FutureTask类是一个Runnable,肯定是要执行一个任务的,这个任务用成员变量 Callable<V\> callable;表示;
FutureTask类需要提供监控任务的状态的功能,这个功能用成员变量volatile int state; 来承载。
FutureTask提供的阻塞等待任务运行、以待结束后取任务结果的功能,是通过一个单链表volatile WaitNode waiters;来实现的,当某线程调用get()方法后,就会将自身cas头插法加入waiters队列,然后调用 LockSupport.park()方法阻塞等待,直到run | cancel这两个的方法执行过程中,调用LockSupport.unpark(t)唤醒waiters中所有阻塞等待的线程。
FutureTask类中定义了成员变量Object outcome,用于存储计算结果、或者run方法执行过程中抛出的异常。
还定义了成员变量volatile Thread runner,用于记录执行这个任务的线程。

run()方法是在任务被Executor执行时调用的:

  1. 首先,任务必须是处于state=NEW的状态,其它状态下说明任务已经开始执行或者执行结束了。
  2. 接着,调用casRunner(this,runnerOffset, null, Thread.currentThread()),设置任务的执行线程runner为当前线程。因为有可能多个线程竞争执行同一个任务,因此必须用cas+volatile来更新
  3. 接着,调用V v = callable.call();,执行任务真正的计算逻辑
  4. 如果call()方法成功执行完毕,则调用set()方法,先casState(this, stateOffset, NEW, COMPLETING); outcome=v;,最后putOrderedInt(this, stateOffset, NORMAL),最后遍历waiters,将所有阻塞等待获取任务运行结果的线程唤醒。
  5. 如果call()方法抛出了异常exception,则调用setException(e)方法,先casState(this, stateOffset, NEW, COMPLETING ),再outcom=e.,最后putOrderedInt(this, stateOffset, EXCEPTIONTAL),最后调用finishCompletion(),将waiters中所有等待获取任务结果的线程唤醒。

cancel( mayInterruptIfRunning )方法则是会根据业务逻辑被其它线程主动调用:

  1. 首先,任务状态必须处于state=NEW的状态;
  2. 接着,调用casState( this, stateOffset, NEW, mayInterruptIfRunning? INTERRUPTING : CANCELLED )
  3. 接着,if(mayInterruptIfRunning ) { try { runner.interrupt(); } finally { putOrderedInt(this, stateOffset, INTERRUPTED); } },将执行线程runner的中断状态置位。无论这个置位是否成功,都要将状态更新为INTERRPTED;
  4. 最后,调用finishComplete()方法,唤醒waiters链表中所有等待获取执行结果的线程。

可以看出,run()方法执行过程中,state有可能出现两种路径的转变:

  • 如果callable.call()成功执行完成,则调用set(v)设置state为: NEW -> COMPLETING -> NORMAL
  • 如果callable.call()执行过程中抛出异常,则调用setException(e)设置state为: NEW -> COMPLETING -> EXCEPTIONAL
    而cancel(mayInterrupted)的执行,state也会出现两种路径的转变:
  • 如果mayInterrupted==true,则state为: NEW -> INTERRUPTING -> INTERRUPTED
  • 如果mayInterrupted==false,则state为: NEW -> CANCELLED
    无论run()调用set(v)还是setException(e),这两个方法最后都会调用finishCompletion()唤醒waiters中所有park在这个任务上的线程。cancel()方法也会调用finishCompletion()方法。

get()方法,则是会调用await()方法,在这个方法中线程会将自己加入waiters链表,然后park(this)阻塞等待,直到run()或者cancel()方法执行后,唤醒这些等待线程。

isDone()方法:只要state!=NEW,就返回true。因为非NEW状态,要么是call方法已经执行完(不会再阻塞了),要么是cancel()了。

精彩评论(0)

0 0 举报