ScheduledThreadPool是一个线程池,用于执行定时任务。它基于ThreadPoolExecutor实现,具有一些特殊的调度功能。下面是它的实现原理代码:
public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor
implements ScheduledExecutorService {
// 定时任务队列
private final DelayedWorkQueue delayedQueue = new DelayedWorkQueue();
// 定时任务线程
private ScheduledFutureTask scheduledTask;
// 创建ScheduledThreadPoolExecutor
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS, new SynchronousQueue<Runnable>());
}
// 实现ScheduledExecutorService.schedule
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
// 将任务封装到ScheduledFutureTask中
ScheduledFutureTask<?> scheduledTask = new ScheduledFutureTask<>(command, null,
System.nanoTime() + unit.toNanos(delay));
// 添加到定时任务队列
delayedQueue.add(scheduledTask);
// 如果该任务是最早的一个,重建定时任务线程
if (scheduledTask == delayedQueue.peek())
scheduleFromNow();
return scheduledTask;
}
// 实现ScheduledExecutorService.scheduleAtFixedRate
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (period <= 0)
throw new IllegalArgumentException();
// 将任务封装到ScheduledFutureTask中
ScheduledFutureTask<Void> scheduledTask = new ScheduledFutureTask<Void>(command,
null, System.nanoTime() + unit.toNanos(initialDelay)) {
public void run() {
// 运行任务
super.run();
// 将该任务添加到定时任务队列
if (!isCancelled())
setNextRunTime(getNextRunTime() + unit.toNanos(period));
if (isShutdown() && remove(this))
reject(this);
}
};
// 添加到定时任务队列
delayedQueue.add(scheduledTask);
// 如果该任务是最早的一个,重建定时任务线程
if (scheduledTask == delayedQueue.peek())
scheduleFromNow();
return scheduledTask;
}
// 实现ScheduledExecutorService.scheduleWithFixedDelay
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (delay <= 0)
throw new IllegalArgumentException();
// 将任务封装到ScheduledFutureTask中
ScheduledFutureTask<Void> scheduledTask = new ScheduledFutureTask<Void>(command,
null, System.nanoTime() + unit.toNanos(initialDelay)) {
public void run() {
// 运行任务
super.run();
// 将该任务添加到定时任务队列
if (!isCancelled())
setNextRunTime(System.nanoTime() + unit.toNanos(delay));
if (isShutdown() && remove(this))
reject(this);
}
};
// 添加到定时任务队列
delayedQueue.add(scheduledTask);
// 如果该任务是最早的一个,重建定时任务线程
if (scheduledTask == delayedQueue.peek())
scheduleFromNow();
return scheduledTask;
}
// 重建定时任务线程
private void scheduleFromNow() {
for (;;) {
ScheduledFutureTask task = delayedQueue.peek();
if (task == null)
break;
// 计算任务下次执行时间
long time = task.getNextRunTime() - System.nanoTime();
if (time <= 0) {
// 下次执行时间已过,执行任务
executeTask(task);
} else {
// 下次执行时间未到,创建新的定时任务线程
if (scheduledTask == null || scheduledTask.getNextRunTime() > task.getNextRunTime()) {
if (scheduledTask != null)
scheduledTask.cancel(false);
scheduledTask = new ScheduledFutureTask<Void>(this, null,
System.nanoTime() + time) {
public void run() {
scheduleFromNow();
}
};
// 执行定时任务线程
super.execute(scheduledTask);
}
break;
}
}
}
// 执行任务
private void executeTask(Runnable runnable) {
if (isShutdown())
reject();
else
super.execute(runnable);
}
// 停止定时任务线程
public void shutdown() {
scheduledTask.cancel(false);
super.shutdown();
}
// 定时任务包装类
static class ScheduledFutureTask<V> extends FutureTask<V> implements ScheduledFuture<V> {
// 下次执行时间
private long nextRunTime;
// 构造函数
public ScheduledFutureTask(Runnable runnable, V result, long nextRunTime) {
super(runnable, result);
this.nextRunTime = nextRunTime;
}
// 获取下次执行时间
public long getNextRunTime() {
return nextRunTime;
}
// 设置下次执行时间
public void setNextRunTime(long nextRunTime) {
this.nextRunTime = nextRunTime;
}
// 比较下次执行时间
public int compareTo(Delayed delayed) {
if (delayed == this)
return 0;
if (delayed instanceof ScheduledFutureTask) {
ScheduledFutureTask<?> that = (ScheduledFutureTask<?>)delayed;
long diff = nextRunTime - that.nextRunTime;
if (diff < 0)
return -1;
else if (diff > 0)
return 1;
else if (hashCode() < that.hashCode())
return -1;
else
return 1;
}
long diff = getDelay(TimeUnit.NANOSECONDS) - delayed.getDelay(TimeUnit.NANOSECONDS);
return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}
}
// 定时任务队列
private class DelayedWorkQueue extends AbstractQueue<Runnable>
implements BlockingQueue<Runnable> {
// 任务队列
private final PriorityQueue<ScheduledFutureTask<?>> taskQueue = new PriorityQueue<>();
// 实现AbstractQueue.iterator
public Iterator<Runnable> iterator() {
return new Iterator<Runnable>() {
final Iterator<ScheduledFutureTask<?>> i = taskQueue.iterator();
public boolean hasNext() { return i.hasNext(); }
public Runnable next() { return i.next(); }
public void remove() { i.remove(); }
};
}
// 实现AbstractCollection.size
public int size() {
return taskQueue.size();
}
// 实现BlockingQueue.remainingCapacity
public int remainingCapacity() {
return Integer.MAX_VALUE;
}
// 实现BlockingQueue.offer
public boolean offer(Runnable o) {
if (!(o instanceof ScheduledFutureTask))
return false;
ScheduledFutureTask<?> task = (ScheduledFutureTask<?>)o;
Boolean f = taskQueue.offer(task);
if (f && taskQueue.peek() == task)
scheduleFromNow();
return f;
}
// 实现BlockingQueue.take
public ScheduledFutureTask<?> take() throws InterruptedException {
return (ScheduledFutureTask<?>)super.take();
}
// 实现BlockingQueue.poll
public ScheduledFutureTask<?> poll(long timeout, TimeUnit unit) throws InterruptedException {
return (ScheduledFutureTask<?>)super.poll(timeout, unit);
}
// 实现BlockingQueue.put
public void put(Runnable o) throws InterruptedException {
offer(o);
}
// 实现BlockingQueue.poll
public boolean offer(Runnable o, long timeout, TimeUnit unit) throws InterruptedException {
return offer(o);
}
// 实现BlockingQueue.peek
public ScheduledFutureTask<?> peek() {
return (ScheduledFutureTask<?>)taskQueue.peek();
}
// 实现BlockingQueue.poll
public ScheduledFutureTask<?> poll() {
ScheduledFutureTask<?> task = taskQueue.poll();
if (task != null && taskQueue.peek() != null)
scheduleFromNow();
return task;
}
// 实现BlockingQueue.remove
public boolean remove(Object o) {
return taskQueue.remove(o);
}
// 实现AbstractQueue.clear
public void clear() {
taskQueue.clear();
}
// 实现AbstractQueue.isEmpty
public boolean isEmpty() {
return taskQueue.isEmpty();
}
}
}
ScheduledThreadPoolExecutor继承了ThreadPoolExecutor类,所以它的线程池部分与ThreadPoolExecutor一样。它使用DelayedWorkQueue作为任务队列,DelayedWorkQueue继承了AbstractQueue,并实现了BlockingQueue接口。它支持基于延迟的任务调度方式,并且能够处理周期性重复任务。ScheduledFutureTask是定时任务的包装类,它实现了ScheduledFuture接口,它封装了Runnable任务,下一次执行的时间以及与其它定时任务的比较实