Java线程池
线程池的实现原理用一句话理解:维持多个线程和一个阻塞队列。队列里的放的是待执行任务,线程从队列里循环获取任务并执行。
初始化线程池的属性
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
}
运行机制
execute(Runnable cmd) 流程:
- 初始一个任务过来,核心线程数从0+1;
- 再来一个任务如果当前线程数少于设置的核心线程数,则+1;
- 当核心线程数达到上限,再来任务先放到队列,当队列满了,则增加线程保证线程总数少于最大线程数
- 超过核心线程数的这些线程,当闲置超过keepAliveTime,则会结束
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
拒绝策略
-
直接抛拒绝异常 new ThreadPoolExecutor.AbortPolicy()
-
调用线程来执行 new ThreadPoolExecutor.CallerRunsPolicy()
说白了就是调用submit()方法所在的线程
-
直接丢弃该任务 new ThreadPoolExecutor.DiscardPolicy()
-
丢弃最老的任务,并执行当前的 new ThreadPoolExecutor.DiscardOldestPolicy()
自定义一个线程池
这里使用了自定义拒绝策略,实际也是丢弃最老的并执行当前的。但发生该拒绝策略时,可以做些事情,如打印下想打的日志。
new ThreadPoolExecutor(4, 8,
60L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(200),
new ThreadFactory() {
private final AtomicInteger threadNumber = new AtomicInteger(1);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "my-pool-" + threadNumber.getAndIncrement());
}
},
(r, executor) -> {
log.warn("reject: Task " + r.toString() + " rejected from " + executor.toString());
if (!executor.isShutdown()) {
executor.getQueue().poll();
executor.execute(r);
}
})
ThreadFactory 参数,可以引入Guava包更简洁如:
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
.setNameFormat("my-pool-%d").build();
线程池对外API
线程池构造函数中的参数变更和获取
如动态修改线程池的线程数、生成线程、拒绝策略等
1. void setCorePoolSize(int corePoolSize);
2. void setMaximumPoolSize(int maximumPoolSize);
3. void setKeepAliveTime(long time, TimeUnit unit);
4. void setThreadFactory(ThreadFactory threadFactory);
5. void setRejectedExecutionHandler(RejectedExecutionHandler handler);
6. 以上同样支持 get() 这些参数
线程池监控项
1. BlockingQueue<Runnable> getQueue()
2. int getPoolSize()
3. int getActiveCount()
4. int getLargestPoolSize()
5. long getTaskCount()
6. long getCompletedTaskCount()
线程池的关闭
// 平滑有序关闭线程池,与此同时,已提交的任务将继续执行,但不再接收新的任务,如果线程池已关闭,此方法调用不会产生额外影响。
// 执行该方法不会阻塞等待已提交的任务执行完毕
1. void shutdown();
// 尝试终止(interrupt())所有正在执行的任务,并停止处理等待队列中的的任务,最后将所有未执行的任务列表的形式返回,此方法会将任务队列中的任务移除并以列表形式返回。
// 执行该方法不会阻塞 等待已提交的任务执行完毕
2. List<Runnable> shutdownNow();
// 方法执行时,阻塞等待一个时间 即线程池执行任务。超时前都关了,返回true;否则返回false
3. boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
4. boolean isShutdown();
5. boolean isTerminating();
6. boolean isTerminated();
关闭线程池,通常shutdown() 和 awaitTermination() 配合执行:shutdown 关闭,awaitTermination 来等待关闭。然后再执行后续动作。
线程池预热
// 预热启动一个核心线程
1. boolean prestartCoreThread();
// 预热启动所有核心线程
2. int prestartAllCoreThreads();
几个常见的实现类
ExecutorService只是接口,Java标准库提供的几个常用实现类有:
- FixedThreadPool:线程数固定的线程池
new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
- CachedThreadPool:线程数根据任务动态调整的线程池
每个任务一个执行线程,最大是线程数 Integer.MAX_VALUE 适合资源充足,当然一般不建议这么用。
new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
- SingleThreadExecutor:仅单线程执行的线程池
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
- newScheduledThreadPool: 调度任务线程池
创建一个线程池,按给定延迟时间或者周期时间 执行任务命令
ScheduledExecutorService service = Executors.newScheduledThreadPool(4);
// 固定频率执行任务,如果某个任务抛出异常未捕获,则会放弃执行后续
// 某个任务执行时间超过这个周期频率,则会延迟执行,不会两个任务并发跑
// 比如任务固定耗时5s,周期是2s,那么等于每次会延迟5s跑一次
ScheduledFuture<?> xx = service.scheduleAtFixedRate(() -> {
System.out.println("xx");
}, 2, 3, TimeUnit.SECONDS);
// 支持取消任务调度
boolean cancel = xx.cancel(true);
// 按固定延迟时间执行,前一个任务结束后,间隔固定时间再跑下一个任务
ScheduledFuture<?> yy = service.scheduleWithFixedDelay(() -> {
System.out.println("yy");
}, 2, 3, TimeUnit.SECONDS);
值得注意的区别
维持线程池里只有一个线程,可以如下这样做 区别是什么?
Executors.newFixedThreadPool(1);
Executors.newSingleThreadExecutor();
Executors.newFixedThreadPool(1); 返回值是个ThreadPoolExecutor对象,这个对象对外暴露很多方法,包括修改核心线程数等线程池的属性变更操作。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
而 Executors.newSingleThreadExecutor(); 获取的是个包装类,包装了ThreadPoolExecutor,这个包装类只对外提供了 ExecutorService这个接口的基本实现,不能再修改线程池的属性了。
Future
ExecutorService提交任务后,会返回一个Future对象。future.get()拿到结果。
原理
Callable或者Runnable任务提交到线程池队列(如上文运行机制),池里的线程从队列拿到任务执行,把执行结果set 到FutureTask结果属性,当future.get()时,从这个结果拿值。
// submit():
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
// 把task包装了下,RunnableFuture继承了 Runnable, Future
RunnableFuture<T> ftask = newTaskFor(task);
// ftask提交到队列,线程执行完会把结果set到ftask属性里
execute(ftask);
// 返回给用户的future
return ftask;
}
public interface RunnableFuture<V> extends Runnable, Future<V> {
/**
* Sets this Future to the result of its computation
* unless it has been cancelled.
*/
void run();
}
Future接口方法:
get() 阻塞等待,并能抛出各异常;支持带超时get(timeout)。
public interface Future<V> {
/**
* 取消已经提交到线程池的任务
*/
boolean cancel(boolean mayInterruptIfRunning);
/**
* 任务完成前是否取消了
*/
boolean isCancelled();
/**
* 任务是否完成了
*/
boolean isDone();
/**
* 阻塞等待
* get() 注意能抛出以下异常
* Waits if necessary for the computation to complete, and then
* retrieves its result.
*
* @return the computed result
* @throws CancellationException if the computation was cancelled
* @throws ExecutionException if the computation threw an
* exception
* @throws InterruptedException if the current thread was interrupted
* while waiting
*/
V get() throws InterruptedException, ExecutionException;
/**
* 带超时的get(),额外有超时异常
*/
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
Future Demo
querySkuListExecutorService = new ThreadPoolExecutor(4, 8,
60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(100), new ThreadPoolExecutor.CallerRunsPolicy());
public List<SkuInfo> multiQuerySkuList(List<Long> skuIds) {
List<SkuInfo> result = new ArrayList<>();
List<Future<List<SkuInfo>>> futureList = new ArrayList<>();
List<List<Long>> partition = Lists.partition(skuIds, 200);
for (List<Long> tempSkuIds : partition) {
Future<List<SkuInfo>> future = querySkuListExecutorService.submit(() -> querySkuList(tempSkuIds));
futureList.add(future);
}
for (Future<List<SkuInfo>> future : futureList) {
try {
result.addAll(future.get());
} catch (Exception e) {
log.warn("查询商品信息失败", e);
throw new WicsBusinessException(ResultCodeConstants.GATEWAY_ERROR, "查询商品信息失败");
}
}
return result;
}
CompletionService
如果从多个来源获取结果,想拿到第一个最快返回的,使用CompletionService更快。 因为更快的返回先放到CompletionService的队列里。
原理
CompletionService包装了Executor,并包含一个放Future对象的队列。 任务被线程池的工作线程执行完成后,会把执行完的future对象放到队列,这样先执行完成的任务就在队列前。
public ExecutorCompletionService(Executor executor) {
if (executor == null)
throw new NullPointerException();
// 包装了executor
this.executor = executor;
this.aes = (executor instanceof AbstractExecutorService) ?
(AbstractExecutorService) executor : null;
// 放future对象的队列
this.completionQueue = new LinkedBlockingQueue<Future<V>>();
}
public Future<V> submit(Callable<V> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<V> f = newTaskFor(task);
// 把任务再包装一下,内部done()方法,会把任务放到队列
executor.execute(new QueueingFuture(f));
return f;
}
private class QueueingFuture extends FutureTask<Void> {
QueueingFuture(RunnableFuture<V> task) {
super(task, null);
this.task = task;
}
// 线程执行完会调用此,把先执行完的任务放到队列
protected void done() { completionQueue.add(task); }
private final Future<V> task;
}
CompletionService Demo
void solve(Executor e,
Collection<Callable<Result>> solvers)
throws InterruptedException {
CompletionService<Result> ecs
= new ExecutorCompletionService<Result>(e);
int n = solvers.size();
List<Future<Result>> futures
= new ArrayList<Future<Result>>(n);
Result result = null;
try {
for (Callable<Result> s : solvers)
futures.add(ecs.submit(s));
for (int i = 0; i < n; ++i) {
try {
// take()后是队列的future对象,get()获取返回值
Result r = ecs.take().get();
if (r != null) {
result = r;
// 拿到结果退出,其他future对象取消
break;
}
} catch (ExecutionException ignore) {}
}
}
finally {
// 取消其他future,节约资源
for (Future<Result> f : futures)
f.cancel(true);
}
if (result != null)
use(result);
}