Java线程池
线程池的实现原理用一句话理解:维持多个线程和一个阻塞队列。队列里的放的是待执行任务,线程从队列里循环获取任务并执行。
初始化线程池的属性
public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        }
运行机制
execute(Runnable cmd) 流程:
- 初始一个任务过来,核心线程数从0+1;
- 再来一个任务如果当前线程数少于设置的核心线程数,则+1;
- 当核心线程数达到上限,再来任务先放到队列,当队列满了,则增加线程保证线程总数少于最大线程数
- 超过核心线程数的这些线程,当闲置超过keepAliveTime,则会结束
public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }
拒绝策略
- 
直接抛拒绝异常 new ThreadPoolExecutor.AbortPolicy() 
- 
调用线程来执行 new ThreadPoolExecutor.CallerRunsPolicy() 说白了就是调用submit()方法所在的线程 
- 
直接丢弃该任务 new ThreadPoolExecutor.DiscardPolicy() 
- 
丢弃最老的任务,并执行当前的 new ThreadPoolExecutor.DiscardOldestPolicy() 
自定义一个线程池
这里使用了自定义拒绝策略,实际也是丢弃最老的并执行当前的。但发生该拒绝策略时,可以做些事情,如打印下想打的日志。
new ThreadPoolExecutor(4, 8,
                60L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>(200),
                new ThreadFactory() {
                    private final AtomicInteger threadNumber = new AtomicInteger(1);
                    @Override
                    public Thread newThread(Runnable r) {
                        return new Thread(r, "my-pool-" + threadNumber.getAndIncrement());
                    }
                },
                (r, executor) -> {
                    log.warn("reject: Task " + r.toString() + " rejected from " + executor.toString());
                    if (!executor.isShutdown()) {
                        executor.getQueue().poll();
                        executor.execute(r);
                    }
                })
ThreadFactory 参数,可以引入Guava包更简洁如:
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
                .setNameFormat("my-pool-%d").build();
线程池对外API
线程池构造函数中的参数变更和获取
如动态修改线程池的线程数、生成线程、拒绝策略等
1. void setCorePoolSize(int corePoolSize);
2. void setMaximumPoolSize(int maximumPoolSize);
3. void setKeepAliveTime(long time, TimeUnit unit);
4. void setThreadFactory(ThreadFactory threadFactory);
5. void setRejectedExecutionHandler(RejectedExecutionHandler handler);
6. 以上同样支持 get() 这些参数
线程池监控项
1. BlockingQueue<Runnable> getQueue()
2. int getPoolSize() 
3. int getActiveCount()
4. int getLargestPoolSize()
5. long getTaskCount()
6. long getCompletedTaskCount()
线程池的关闭
// 平滑有序关闭线程池,与此同时,已提交的任务将继续执行,但不再接收新的任务,如果线程池已关闭,此方法调用不会产生额外影响。
// 执行该方法不会阻塞等待已提交的任务执行完毕
1. void shutdown(); 
// 尝试终止(interrupt())所有正在执行的任务,并停止处理等待队列中的的任务,最后将所有未执行的任务列表的形式返回,此方法会将任务队列中的任务移除并以列表形式返回。
// 执行该方法不会阻塞 等待已提交的任务执行完毕
2. List<Runnable> shutdownNow();
// 方法执行时,阻塞等待一个时间 即线程池执行任务。超时前都关了,返回true;否则返回false
3. boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException; 
4. boolean isShutdown();
5. boolean isTerminating();
6. boolean isTerminated();
关闭线程池,通常shutdown() 和 awaitTermination() 配合执行:shutdown 关闭,awaitTermination 来等待关闭。然后再执行后续动作。
线程池预热
// 预热启动一个核心线程
1. boolean prestartCoreThread(); 
// 预热启动所有核心线程
2. int prestartAllCoreThreads();
几个常见的实现类
ExecutorService只是接口,Java标准库提供的几个常用实现类有:
- FixedThreadPool:线程数固定的线程池
new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
- CachedThreadPool:线程数根据任务动态调整的线程池
每个任务一个执行线程,最大是线程数 Integer.MAX_VALUE 适合资源充足,当然一般不建议这么用。
new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
- SingleThreadExecutor:仅单线程执行的线程池
return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
- newScheduledThreadPool: 调度任务线程池
创建一个线程池,按给定延迟时间或者周期时间 执行任务命令
ScheduledExecutorService service = Executors.newScheduledThreadPool(4);
// 固定频率执行任务,如果某个任务抛出异常未捕获,则会放弃执行后续
// 某个任务执行时间超过这个周期频率,则会延迟执行,不会两个任务并发跑
// 比如任务固定耗时5s,周期是2s,那么等于每次会延迟5s跑一次
ScheduledFuture<?> xx = service.scheduleAtFixedRate(() -> {
            System.out.println("xx");
        }, 2, 3, TimeUnit.SECONDS);
// 支持取消任务调度
boolean cancel = xx.cancel(true);
// 按固定延迟时间执行,前一个任务结束后,间隔固定时间再跑下一个任务
ScheduledFuture<?> yy = service.scheduleWithFixedDelay(() -> {
    System.out.println("yy");
}, 2, 3, TimeUnit.SECONDS);
值得注意的区别
维持线程池里只有一个线程,可以如下这样做 区别是什么?
Executors.newFixedThreadPool(1);
Executors.newSingleThreadExecutor();
Executors.newFixedThreadPool(1); 返回值是个ThreadPoolExecutor对象,这个对象对外暴露很多方法,包括修改核心线程数等线程池的属性变更操作。
public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
而 Executors.newSingleThreadExecutor(); 获取的是个包装类,包装了ThreadPoolExecutor,这个包装类只对外提供了 ExecutorService这个接口的基本实现,不能再修改线程池的属性了。
Future
ExecutorService提交任务后,会返回一个Future对象。future.get()拿到结果。
原理
Callable或者Runnable任务提交到线程池队列(如上文运行机制),池里的线程从队列拿到任务执行,把执行结果set 到FutureTask结果属性,当future.get()时,从这个结果拿值。
// submit():
public <T> Future<T> submit(Callable<T> task) {
    if (task == null) throw new NullPointerException();
    // 把task包装了下,RunnableFuture继承了 Runnable, Future
    RunnableFuture<T> ftask = newTaskFor(task);
    // ftask提交到队列,线程执行完会把结果set到ftask属性里
    execute(ftask);
    // 返回给用户的future
    return ftask;
}
public interface RunnableFuture<V> extends Runnable, Future<V> {
    /**
     * Sets this Future to the result of its computation
     * unless it has been cancelled.
     */
    void run();
}
Future接口方法:
get() 阻塞等待,并能抛出各异常;支持带超时get(timeout)。
public interface Future<V> {
    /**
     * 取消已经提交到线程池的任务
     */
    boolean cancel(boolean mayInterruptIfRunning);
    /**
     * 任务完成前是否取消了
     */
    boolean isCancelled();
    /**
     * 任务是否完成了
     */
    boolean isDone();
    /**
     * 阻塞等待
     * get() 注意能抛出以下异常
     * Waits if necessary for the computation to complete, and then
     * retrieves its result.
     *
     * @return the computed result
     * @throws CancellationException if the computation was cancelled
     * @throws ExecutionException if the computation threw an
     * exception
     * @throws InterruptedException if the current thread was interrupted
     * while waiting
     */
    V get() throws InterruptedException, ExecutionException;
    /**
     * 带超时的get(),额外有超时异常
     */
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}
Future Demo
querySkuListExecutorService = new ThreadPoolExecutor(4, 8,
        60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(100), new ThreadPoolExecutor.CallerRunsPolicy());
public List<SkuInfo> multiQuerySkuList(List<Long> skuIds) {
    List<SkuInfo> result = new ArrayList<>();
    List<Future<List<SkuInfo>>> futureList = new ArrayList<>();
    List<List<Long>> partition = Lists.partition(skuIds, 200);
    for (List<Long> tempSkuIds : partition) {
        Future<List<SkuInfo>> future = querySkuListExecutorService.submit(() -> querySkuList(tempSkuIds));
        futureList.add(future);
    }
    for (Future<List<SkuInfo>> future : futureList) {
        try {
            result.addAll(future.get());
        } catch (Exception e) {
            log.warn("查询商品信息失败", e);
            throw new WicsBusinessException(ResultCodeConstants.GATEWAY_ERROR, "查询商品信息失败");
        }
    }
    return result;
}
CompletionService
如果从多个来源获取结果,想拿到第一个最快返回的,使用CompletionService更快。 因为更快的返回先放到CompletionService的队列里。
原理
CompletionService包装了Executor,并包含一个放Future对象的队列。 任务被线程池的工作线程执行完成后,会把执行完的future对象放到队列,这样先执行完成的任务就在队列前。
public ExecutorCompletionService(Executor executor) {
    if (executor == null)
        throw new NullPointerException();
    // 包装了executor
    this.executor = executor;
    this.aes = (executor instanceof AbstractExecutorService) ?
        (AbstractExecutorService) executor : null;
    // 放future对象的队列
    this.completionQueue = new LinkedBlockingQueue<Future<V>>();
}
public Future<V> submit(Callable<V> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<V> f = newTaskFor(task);
    // 把任务再包装一下,内部done()方法,会把任务放到队列
    executor.execute(new QueueingFuture(f));
    return f;
}
private class QueueingFuture extends FutureTask<Void> {
    QueueingFuture(RunnableFuture<V> task) {
        super(task, null);
        this.task = task;
    }
    // 线程执行完会调用此,把先执行完的任务放到队列
    protected void done() { completionQueue.add(task); }
    private final Future<V> task;
}
CompletionService Demo
void solve(Executor e,
            Collection<Callable<Result>> solvers)
     throws InterruptedException {
     CompletionService<Result> ecs
         = new ExecutorCompletionService<Result>(e);
     int n = solvers.size();
     List<Future<Result>> futures
         = new ArrayList<Future<Result>>(n);
     Result result = null;
     try {
         for (Callable<Result> s : solvers)
             futures.add(ecs.submit(s));
         for (int i = 0; i < n; ++i) {
             try {
                 // take()后是队列的future对象,get()获取返回值
                 Result r = ecs.take().get();
                 if (r != null) {
                     result = r;
                     // 拿到结果退出,其他future对象取消
                     break;
                 }
             } catch (ExecutionException ignore) {}
         }
     }
     finally {
         // 取消其他future,节约资源
         for (Future<Result> f : futures)
             f.cancel(true);
     }
     if (result != null)
         use(result);
 }