0
点赞
收藏
分享

微信扫一扫

ScheduledThreadPool原理

ScheduledThreadPool是一个线程池,用于执行定时任务。它基于ThreadPoolExecutor实现,具有一些特殊的调度功能。下面是它的实现原理代码:

public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor
        implements ScheduledExecutorService {

    // 定时任务队列
    private final DelayedWorkQueue delayedQueue = new DelayedWorkQueue();

    // 定时任务线程
    private ScheduledFutureTask scheduledTask;

    // 创建ScheduledThreadPoolExecutor
    public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS, new SynchronousQueue<Runnable>());
    }

    // 实现ScheduledExecutorService.schedule
    public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
        // 将任务封装到ScheduledFutureTask中
        ScheduledFutureTask<?> scheduledTask = new ScheduledFutureTask<>(command, null,
                System.nanoTime() + unit.toNanos(delay));
        // 添加到定时任务队列
        delayedQueue.add(scheduledTask);
        // 如果该任务是最早的一个,重建定时任务线程
        if (scheduledTask == delayedQueue.peek())
            scheduleFromNow();
        return scheduledTask;
    }

    // 实现ScheduledExecutorService.scheduleAtFixedRate
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        if (period <= 0)
            throw new IllegalArgumentException();
        // 将任务封装到ScheduledFutureTask中
        ScheduledFutureTask<Void> scheduledTask = new ScheduledFutureTask<Void>(command,
                null, System.nanoTime() + unit.toNanos(initialDelay)) {
            public void run() {
                // 运行任务
                super.run();
                // 将该任务添加到定时任务队列
                if (!isCancelled())
                    setNextRunTime(getNextRunTime() + unit.toNanos(period));

                if (isShutdown() && remove(this))
                    reject(this);
            }
        };
        // 添加到定时任务队列
        delayedQueue.add(scheduledTask);
        // 如果该任务是最早的一个,重建定时任务线程
        if (scheduledTask == delayedQueue.peek())
            scheduleFromNow();
        return scheduledTask;
    }

    // 实现ScheduledExecutorService.scheduleWithFixedDelay
    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        if (delay <= 0)
            throw new IllegalArgumentException();
        // 将任务封装到ScheduledFutureTask中
        ScheduledFutureTask<Void> scheduledTask = new ScheduledFutureTask<Void>(command,
                null, System.nanoTime() + unit.toNanos(initialDelay)) {
            public void run() {
                // 运行任务
                super.run();
                // 将该任务添加到定时任务队列
                if (!isCancelled())
                    setNextRunTime(System.nanoTime() + unit.toNanos(delay));

                if (isShutdown() && remove(this))
                    reject(this);
            }
        };
        // 添加到定时任务队列
        delayedQueue.add(scheduledTask);
        // 如果该任务是最早的一个,重建定时任务线程
        if (scheduledTask == delayedQueue.peek())
            scheduleFromNow();
        return scheduledTask;
    }

    // 重建定时任务线程
    private void scheduleFromNow() {
        for (;;) {
            ScheduledFutureTask task = delayedQueue.peek();
            if (task == null)
                break;
            // 计算任务下次执行时间
            long time = task.getNextRunTime() - System.nanoTime();
            if (time <= 0) {
                // 下次执行时间已过,执行任务
                executeTask(task);
            } else {
                // 下次执行时间未到,创建新的定时任务线程
                if (scheduledTask == null || scheduledTask.getNextRunTime() > task.getNextRunTime()) {
                    if (scheduledTask != null)
                        scheduledTask.cancel(false);
                    scheduledTask = new ScheduledFutureTask<Void>(this, null,
                            System.nanoTime() + time) {
                        public void run() {
                            scheduleFromNow();
                        }
                    };
                    // 执行定时任务线程
                    super.execute(scheduledTask);
                }
                break;
            }
        }
    }

    // 执行任务
    private void executeTask(Runnable runnable) {
        if (isShutdown())
            reject();
        else
            super.execute(runnable);
    }

    // 停止定时任务线程
    public void shutdown() {
        scheduledTask.cancel(false);
        super.shutdown();
    }

    // 定时任务包装类
    static class ScheduledFutureTask<V> extends FutureTask<V> implements ScheduledFuture<V> {

        // 下次执行时间
        private long nextRunTime;

        // 构造函数
        public ScheduledFutureTask(Runnable runnable, V result, long nextRunTime) {
            super(runnable, result);
            this.nextRunTime = nextRunTime;
        }

        // 获取下次执行时间
        public long getNextRunTime() {
            return nextRunTime;
        }

        // 设置下次执行时间
        public void setNextRunTime(long nextRunTime) {
            this.nextRunTime = nextRunTime;
        }

        // 比较下次执行时间
        public int compareTo(Delayed delayed) {
            if (delayed == this)
                return 0;
            if (delayed instanceof ScheduledFutureTask) {
                ScheduledFutureTask<?> that = (ScheduledFutureTask<?>)delayed;
                long diff = nextRunTime - that.nextRunTime;
                if (diff < 0)
                    return -1;
                else if (diff > 0)
                    return 1;
                else if (hashCode() < that.hashCode())
                    return -1;
                else
                    return 1;
            }
            long diff = getDelay(TimeUnit.NANOSECONDS) - delayed.getDelay(TimeUnit.NANOSECONDS);
            return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
        }
    }

    // 定时任务队列
    private class DelayedWorkQueue extends AbstractQueue<Runnable>
            implements BlockingQueue<Runnable> {

        // 任务队列
        private final PriorityQueue<ScheduledFutureTask<?>> taskQueue = new PriorityQueue<>();

        // 实现AbstractQueue.iterator
        public Iterator<Runnable> iterator() {
            return new Iterator<Runnable>() {
                final Iterator<ScheduledFutureTask<?>> i = taskQueue.iterator();
                public boolean hasNext() { return i.hasNext(); }
                public Runnable next() { return i.next(); }
                public void remove() { i.remove(); }
            };
        }

        // 实现AbstractCollection.size
        public int size() {
            return taskQueue.size();
        }

        // 实现BlockingQueue.remainingCapacity
        public int remainingCapacity() {
            return Integer.MAX_VALUE;
        }

        // 实现BlockingQueue.offer
        public boolean offer(Runnable o) {
            if (!(o instanceof ScheduledFutureTask))
                return false;
            ScheduledFutureTask<?> task = (ScheduledFutureTask<?>)o;
            Boolean f = taskQueue.offer(task);
            if (f && taskQueue.peek() == task)
                scheduleFromNow();
            return f;       
        }

        // 实现BlockingQueue.take
        public ScheduledFutureTask<?> take() throws InterruptedException {
            return (ScheduledFutureTask<?>)super.take();
        }

        // 实现BlockingQueue.poll
        public ScheduledFutureTask<?> poll(long timeout, TimeUnit unit) throws InterruptedException {
            return (ScheduledFutureTask<?>)super.poll(timeout, unit);
        }

        // 实现BlockingQueue.put
        public void put(Runnable o) throws InterruptedException {
            offer(o);
        }

        // 实现BlockingQueue.poll
        public boolean offer(Runnable o, long timeout, TimeUnit unit) throws InterruptedException {
            return offer(o);
        }

        // 实现BlockingQueue.peek
        public ScheduledFutureTask<?> peek() {
            return (ScheduledFutureTask<?>)taskQueue.peek();
        }

        // 实现BlockingQueue.poll
        public ScheduledFutureTask<?> poll() {
            ScheduledFutureTask<?> task = taskQueue.poll();
            if (task != null && taskQueue.peek() != null)
                scheduleFromNow();
            return task;
        }

        // 实现BlockingQueue.remove
        public boolean remove(Object o) {
            return taskQueue.remove(o);
        }

        // 实现AbstractQueue.clear
        public void clear() {
            taskQueue.clear();
        }

        // 实现AbstractQueue.isEmpty
        public boolean isEmpty() {
            return taskQueue.isEmpty();
        }
    }

}

ScheduledThreadPoolExecutor继承了ThreadPoolExecutor类,所以它的线程池部分与ThreadPoolExecutor一样。它使用DelayedWorkQueue作为任务队列,DelayedWorkQueue继承了AbstractQueue,并实现了BlockingQueue接口。它支持基于延迟的任务调度方式,并且能够处理周期性重复任务。ScheduledFutureTask是定时任务的包装类,它实现了ScheduledFuture接口,它封装了Runnable任务,下一次执行的时间以及与其它定时任务的比较实

举报

相关推荐

0 条评论