Executor
Executor
位于java.util.concurrent
包中,主要作用是解耦任务的提交和任务的执行细节。通过使用Executor
,我们不需要关心任务是在哪个线程中执行、如何调度等细节。我们只需要将任务提交给Executor
,由它来负责执行。
Executor
接口中只有一个方法:
void execute(Runnable command);
这个方法接受一个Runnable
对象作为参数,表示要执行的任务。当我们调用这个方法时,Executor
会在某个时间点执行这个任务。
任务的执行可能在一个新的线程、一个线程池中的线程,或者是在调用这个方法的线程中,这取决于Executor
的具体实现。
ExecutorService
ExecutorService
继承自Executor
接口,主要作用是提供管理任务终止的方法,以及可以生成用于跟踪一个或多个异步任务进度的Future
。
通过使用ExecutorService
,我们可以控制任务的提交和执行,包括启动、关闭和获取任务的结果等。
ExecutorService
接口中定义了以下方法:
方法名 | 描述 |
---|---|
shutdown() | 发起一个有序的关闭过程,执行已经提交的任务,但不接受新任务。如果已经关闭,则调用该方法没有额外效果。 |
shutdownNow() | 尝试停止所有正在执行的任务,并阻止等待的任务开始执行,同时返回等待执行的任务列表。该方法不会等待正在执行的任务终止。 |
isShutdown() | 判断线程池是否已关闭。 |
isTerminated() | 判断关闭后所有任务是否已完成。注意,只有调用shutdown() 或shutdownNow() 后,该方法才会返回true 。 |
awaitTermination(long timeout, TimeUnit unit) | 阻塞当前线程,直到所有任务在关闭请求后完成执行,或者发生超时,或者当前线程被中断,以先发生者为准。 |
submit(Callable<T> task) | 提交一个返回结果的任务,并返回一个Future 对象,通过该对象的get() 方法可以获取任务的结果。 |
submit(Runnable task, T result) | 提交一个Runnable 任务,并指定结果,返回一个Future 对象,通过该对象的get() 方法可以获取指定的结果。 |
submit(Runnable task) | 提交一个Runnable 任务,返回一个Future 对象,通过该对象的get() 方法可以获取null 。 |
invokeAll(Collection<? extends Callable<T>> tasks) | 执行给定任务集合中的所有任务,并返回一个Future 列表,每个Future 表示一个任务的执行状态和结果。该方法会阻塞直到所有任务都完成。 |
invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) | 执行给定任务集合中的所有任务,并返回一个Future 列表,每个Future 表示一个任务的执行状态和结果。该方法会阻塞直到所有任务都完成或发生超时,如果发生超时,未完成的任务将被取消。 |
invokeAny(Collection<? extends Callable<T>> tasks) | 执行给定任务集合中的任意一个任务,并返回第一个成功完成的任务的结果。该方法会阻塞直到有一个任务成功完成。 |
invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) | 执行给定任务集合中的任意一个任务,并返回第一个成功完成的任务的结果。该方法会阻塞直到有一个任务成功完成或发生超时,如果发生超时,未完成的任务将被取消。 |
ScheduledExecutorService
这个接口ScheduledExecutorService
继承自ExecutorService
接口,并添加了定时任务执行的功能。
接口定义
ScheduledExecutorService
是一个可以安排命令在给定延迟后运行,或者定期执行的ExecutorService
。
主要功能
- 延迟执行任务:通过
schedule
方法,可以提交一个任务,使其在指定的延迟时间后执行。 - 定期执行任务:
scheduleAtFixedRate
:提交一个任务,使其在初始延迟后开始执行,然后以固定频率重复执行。scheduleWithFixedDelay
:提交一个任务,使其在初始延迟后开始执行,然后每次执行结束后等待固定延迟时间,再进行下一次执行。
在ScheduledExecutorService
时,需要注意以下事项:
- 相对延迟:所有的
schedule
方法接受的延迟和周期参数都是相对于当前时间的,而不是绝对时间或日期。 - 零和负延迟:零和负延迟(但不是周期)在
schedule
方法中是允许的,会被视为立即执行的请求。 - 任务取消:通过
schedule
方法返回的ScheduledFuture
对象可以用来取消任务或检查任务执行状态。
方法说明
schedule(Runnable command, long delay, TimeUnit unit)
:- 提交一个一次性任务,在给定延迟后执行。
- 返回一个
ScheduledFuture
对象,该对象的get()
方法在任务完成后返回null
。
schedule(Callable<V> callable, long delay, TimeUnit unit)
:- 提交一个有返回值的一次性任务,在给定延迟后执行。
- 返回一个
ScheduledFuture
对象,可以用来获取结果或取消任务。
scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)
:- 提交一个周期性任务,在初始延迟后开始执行,然后以固定频率重复执行。
- 如果任务执行时间超过周期,下一次执行将延后开始。
scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)
:- 提交一个周期性任务,在初始延迟后开始执行,每次执行结束后等待固定延迟时间再进行下一次执行。
AbstractExecutorService
AbstractExecutorService
实现了ExecutorService
接口,主要作用是提供ExecutorService
接口中定义的submit
、invokeAny
和invokeAll
方法的默认实现。这些方法用于提交任务并获取任务的执行结果。
该类定义了以下几个重要的方法:
方法名 | 描述 |
---|---|
newTaskFor(Runnable runnable, T value) | 返回一个RunnableFuture 对象,该对象在执行时会运行给定的runnable 任务,并返回给定的value 作为结果。 |
newTaskFor(Callable<T> callable) | 返回一个RunnableFuture 对象,该对象在执行时会调用给定的callable 任务,并返回该任务的执行结果。 |
submit(Runnable task) | 提交一个Runnable 任务,并返回一个Future 对象。这个Future 对象可以用来获取任务的执行结果或取消任务的执行。 |
submit(Runnable task, T result) | 提交一个Runnable 任务,并指定一个结果,返回一个Future 对象。这个Future 对象可以用来获取指定的结果或取消任务的执行。 |
submit(Callable<T> task) | 提交一个Callable 任务,并返回一个Future 对象。这个Future 对象可以用来获取任务的执行结果或取消任务的执行。 |
invokeAny(Collection<? extends Callable<T>> tasks) | 执行给定任务集合中的任意一个任务,并返回第一个成功完成的任务的结果。 |
invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) | 执行给定任务集合中的任意一个任务,并返回第一个成功完成的任务的结果。如果在指定的时间内没有任务完成,则抛出TimeoutException 。 |
invokeAll(Collection<? extends Callable<T>> tasks) | 执行给定任务集合中的所有任务,并返回一个Future 列表,每个Future 表示一个任务的执行状态和结果。 |
invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) | 执行给定任务集合中的所有任务,并返回一个Future 列表,每个Future 表示一个任务的执行状态和结果。如果在指定的时间内没有任务完成,则抛出TimeoutException 。 |
submit
实现
查看代码
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
两个newTaskFor
方法用于创建FutureTask
实例。FutureTask
是一个实现了RunnableFuture
接口的类,它可以作为一个Runnable
来执行,同时也是一个Future
,可以用来获取执行结果或取消任务。
submit
方法利用了FutureTask
来包装Runnable
和Callable
任务,使得它们可以在异步执行的同时提供对任务状态和结果的访问。
submit
和execute
的区别
查看代码
/* execute 是Executor接口仅有的一个方法 */
public interface Executor {
void execute(Runnable command);
}
/* submit 是AbstractExecutorService抽象类的方法,该类实现了Executor接口 */
public abstract class AbstractExecutorService implements ExecutorService {
// ...
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
// ...
}
submit
方法是execute
方法的增强版,它提供了任务结果的获取能力,并且在内部通过调用execute
方法来实现任务的提交。
- 功能目的:
execute(Runnable command)
:这是最基础的提交任务方法,用于将一个Runnable
任务提交给线程池执行。它不返回任何结果,也不提供检查任务执行状态的能力。submit(Runnable task)
和submit(Runnable task, T result)
:这两个submit
方法都是用于提交任务,但它们返回一个Future
对象,允许调用者检查任务的执行状态,并且能够获取任务执行的结果(如果提供了结果)。
- 返回值:
execute
方法不返回任何值。submit
方法返回一个Future
对象,这个对象可以用来跟踪任务的执行状态和结果。
- 实现关系:
- 在内部实现上,
submit
方法实际上调用了execute
方法来将任务提交给线程池。submit
方法首先将Runnable
任务包装成一个RunnableFuture
(通过newTaskFor
方法new
一个FutureTask
实现类),然后调用execute
方法来执行这个RunnableFuture
。 - 这意味着
submit
方法在功能上是execute
方法的封装,提供了额外的功能,如任务结果的获取。
- 在内部实现上,
- 异常处理:
execute
方法在任务无法执行时(例如线程池已关闭或达到容量上限)会抛出RejectedExecutionException
。submit
方法同样会在任务无法执行时抛出RejectedExecutionException
,但由于它返回Future
对象,调用者还可以通过Future
对象来检查任务是否被拒绝。
ThreadPoolExecutor
ThreadPoolExecutor
扩展自AbstractExecutorService
类,主要作用是提供了一个线程池,用于执行提交的Runnable
或Callable
任务。
线程池可以有效地管理多个线程,减少任务执行的开销,并提供资源限制和管理的机制。
属性
常量
这些常量主要定义了线程池的状态以及如何在一个整数中编码这些状态和线程的数量,用于方便属性ctl
值的可读性。
常量名 | 描述 |
---|---|
COUNT_BITS | 这是一个静态常量,表示整数中用于表示线程数量的位数。由于整数的大小是32位,减去高3位用于表示状态,剩下的29位用于表示线程数量。 |
COUNT_MASK | 这是一个静态常量,用于从整数中提取线程数量的二进制掩码。这个掩码通过将整数右移COUNT_BITS 位并减去1得到。 |
RUNNING | 这是一个静态常量,表示线程池处于运行状态。它的值是-1 << COUNT_BITS ,即所有位都为1,除了高3位为0。 |
SHUTDOWN | 这是一个静态常量,表示线程池正在关闭,但仍在执行已提交的任务。它的值是0 << COUNT_BITS ,即高3位为0,低29位为0。 |
STOP | 这是一个静态常量,表示线程池已经关闭,并且尝试中断所有正在执行的任务。它的值是1 << COUNT_BITS ,即高3位为1,低29位为0。 |
TIDYING | 这是一个静态常量,表示所有任务都已经执行完毕,线程池正在清理状态。它的值是2 << COUNT_BITS ,即高3位为10,低29位为0。 |
TERMINATED | 这是一个静态常量,表示线程池已经终止。它的值是3 << COUNT_BITS ,即高3位为11,低29位为0。 |
静态属性
属性名 | 描述 |
---|---|
defaultHandler | 这是一个静态的RejectedExecutionHandler 实例,用于定义当线程池无法处理新任务时的默认处理策略。默认策略是AbortPolicy ,它会抛出RejectedExecutionException 。 |
shutdownPerm | 这是一个RuntimePermission 类型的静态属性,它表示修改线程的运行时权限。当调用线程池的关闭方法时,会检查是否具有这个权限。如果没有,可能会抛出SecurityException 。 |
主要属性
属性/方法名 | 描述 |
---|---|
ctl | 这是一个原子整数,用于表示线程池的状态和线程数量。这个整数的高3位表示线程池的状态,低29位表示线程数量。ctl 的初始值是ctlOf(RUNNING, 0) 。 |
workQueue | 这是一个阻塞队列,用于存放待执行的任务。线程池将提交的任务放入这个队列中,然后从队列中取出任务并执行。 |
mainLock | 这是一个重入锁,用于访问工作线程集和相关记录。当线程池需要修改工作线程集或进行其他维护操作时,需要获取这个锁。 |
workers | 这是一个哈希集,包含所有工作线程。工作线程是线程池中的实际执行者,它们负责从队列中取出任务并执行。 |
termination | 这是一个条件对象,用于支持awaitTermination 方法。当线程池需要等待所有任务执行完毕时,会使用这个条件对象。 |
largestPoolSize | 这是一个整数,记录线程池达到过的最大大小。这个大小是指线程池中同时存在的最大线程数量。 |
completedTaskCount | 这是一个长整数,记录完成的任务数量。每当一个任务被执行完毕,这个计数器就会增加。 |
threadFactory | 这是一个线程工厂,用于创建新线程。当线程池需要创建新线程时,会使用这个工厂。 |
handler | 这是一个RejectedExecutionHandler 类型的属性,用于定义当线程池饱和或正在关闭时,无法执行新任务的处理策略。 |
keepAliveTime | 这是一个长整数,表示线程空闲时间。当线程池中的线程空闲时间超过这个值时,线程会被终止。 |
allowCoreThreadTimeOut | 这是一个布尔值,表示是否允许核心线程超时。如果设置为true ,核心线程在空闲时也会根据keepAliveTime 的设置来超时并终止。 |
corePoolSize | 这是一个整数,表示核心线程数。线程池至少会保留这个数量的线程,即使这些线程在空闲状态下。 |
maximumPoolSize | 这是一个整数,表示最大线程数。线程池最多会创建这个数量的线程。如果线程池中的线程数超过这个值,新的任务会被添加到任务队列中等待执行。 |
内部类
Worker
类
查看代码
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;
/** Thread this worker is running in. Null if factory fails. */
@SuppressWarnings("serial") // Unlikely to be serializable
final Thread thread;
/** Initial task to run. Possibly null. */
@SuppressWarnings("serial") // Not statically typed as Serializable
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
// TODO: switch to AbstractQueuedLongSynchronizer and move
// completedTasks into the lock word.
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker. */
public void run() {
runWorker(this);
}
// Lock methods
//
// The value 0 represents the unlocked state.
// The value 1 represents the locked state.
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
Worker
类扩展了AbstractQueuedSynchronizer
(AQS)并实现了Runnable
接口。这意味着Worker
本身是一个可以被线程执行的任务。
Worker
类主要用于封装一个工作线程的状态和行为,这些状态包括与该线程关联的任务队列、已完成的任务数等。
从构造函数可以看出,创建一个新的Worker
实例会初始化firstTask
(可能是null
,表示没有初始任务),并通过线程工厂创建一个线程,并将当前Worker
对象即this
作为任务传递给这个线程。
这个任务就是Worker
对象的run()
方法,里面只是调用了外部ThreadPoolExecutor
类的一个方法runWorker(this)
。在runWorker
方法中,Worker
进入一个循环,不断地从任务队列中获取任务执行,以实现线程复用。
生命周期
Worker
类的生命周期在Java线程池中大致可以划分为以下几个阶段:
创建阶段:
Worker
对象通过线程池的addWorker
方法创建,这个过程中会调用Worker
的构造函数。在构造函数中,会初始化
firstTask
(可能是null
,表示没有初始任务),并通过线程工厂创建一个线程,并将当前Worker
对象作为任务传递给这个线程。构造函数中还会将同步状态设置为
-1
,以防止线程在启动前被中断。启动阶段:
创建完成后,当线程池启动这个
Worker
,即调用thread.start()
时,会调用Worker
的run
方法。run
方法内部会调用线程池的runWorker
方法,这是Worker
开始执行任务的地方。运行阶段:
在
runWorker
方法中,Worker
进入一个循环,不断地从任务队列中获取任务执行。如果
firstTask
不为null
,则会先执行这个初始任务。之后,
Worker
会调用getTask
方法从任务队列中获取新的任务来执行。每次循环中,
Worker
会尝试获取独占锁,以确保在执行任务期间不会被中断。执行任务,并在任务执行完毕后释放锁。
任务执行完毕:
每个任务执行完毕后,
Worker
会更新它完成的任务数completedTasks
。如果任务执行过程中发生异常,线程池会处理这些异常,并可能决定终止这个
Worker
。终止阶段:
如果
Worker
由于以下原因之一需要终止:- 线程池正在关闭。
Worker
在执行任务时出现异常。Worker
由于空闲时间过长(例如,线程池允许核心线程超时)被终止。
终止过程中,
Worker
会退出runWorker
方法的循环,并执行清理操作。线程会结束
run
方法的执行,从而结束线程的生命周期。清理阶段:
当
Worker
的线程结束时,JVM会回收这个线程的资源。如果线程池完全关闭,它会清理所有
Worker
对象,并释放它们占用的资源。
拒绝策略类
CallerRunsPolicy
当线程池的线程数达到最大值并且工作队列已满时,该策略会将任务回退到提交任务的线程中执行,也就是调用execute
方法的线程。
这个策略避免了任务被直接拒绝,而是让调用者线程自己执行任务,从而降低线程池的压力。
这是一种“反馈控制”机制,能够减慢任务提交的速度。
AbortPolicy
查看代码
public static class AbortPolicy implements RejectedExecutionHandler {
/**
* Creates an {@code AbortPolicy}.
*/
public AbortPolicy() { }
/**
* Always throws RejectedExecutionException.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
* @throws RejectedExecutionException always
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
这是ThreadPoolExecutor
的默认拒绝策略。
当线程池无法处理新任务时,它会抛出RejectedExecutionException
异常。
这个策略意在立即通知调用者线程池无法处理任务,需要调用者的响应。
DiscardPolicy
当线程池无法处理新任务时,这个策略简单地丢弃任务,不做任何处理。
这种策略适用于那些对任务执行结果不关心或者允许任务丢失的场景。
DiscardOldestPolicy
当线程池无法处理新任务时,这个策略会尝试丢弃队列中最老的任务,然后尝试重新提交新任务。
这种策略认为新任务比旧任务更重要,因此选择丢弃旧任务来为新任务腾出空间。
这种策略可能会造成旧任务的丢失,因此在使用时需要确保这种丢失是可以接受的。
方法
shutdown()
方法
shutdown()
方法首先确保线程池可以被安全地关闭,然后设置线程池状态,中断空闲线程,处理计划任务,最后尝试终止线程池。这个过程确保了线程池的平滑关闭,同时处理了所有必要的清理工作。
shutdown() 方法:
查看代码
javapublic void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(SHUTDOWN); interruptIdleWorkers(); onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } tryTerminate(); }
这个方法是关闭线程池的入口点。它首先获取一个全局锁
mainLock
,确保在执行关闭操作时线程池的状态不会被其他线程修改。checkShutdownAccess() 方法:
查看代码
javaprivate void checkShutdownAccess() { // assert mainLock.isHeldByCurrentThread(); @SuppressWarnings("removal") SecurityManager security = System.getSecurityManager(); if (security != null) { security.checkPermission(shutdownPerm); for (Worker w : workers) security.checkAccess(w.thread); } }
在获取锁之后,这个方法检查当前线程是否有权限关闭线程池。如果有安全管理器,它将检查是否有关闭线程池的权限,以及是否有权限中断所有工作线程。
advanceRunState(int targetState) 方法:
javaprivate void advanceRunState(int targetState) { // assert targetState == SHUTDOWN || targetState == STOP; for (;;) { int c = ctl.get(); if (runStateAtLeast(c, targetState) || ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c)))) break; } }
这个方法将线程池的运行状态设置为
SHUTDOWN
。它使用一个循环和compareAndSet
来保证状态更新的原子性。interruptIdleWorkers() 方法:
javaprivate void interruptIdleWorkers() { interruptIdleWorkers(false); }
这个方法调用
interruptIdleWorkers(false)
,中断所有空闲的工作线程,即那些在等待任务的工作线程。interruptIdleWorkers(boolean onlyOne) 方法:
查看代码
javaprivate void interruptIdleWorkers(boolean onlyOne) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) { Thread t = w.thread; if (!t.isInterrupted() && w.tryLock()) { try { t.interrupt(); } catch (SecurityException ignore) { } finally { w.unlock(); } } if (onlyOne) break; } } finally { mainLock.unlock(); } }
这个方法遍历所有工作线程,并尝试中断那些空闲的线程。如果
onlyOne
参数为true
,则只中断一个线程。onShutdown() 方法:
查看代码
java@Override void onShutdown() { BlockingQueue<Runnable> q = super.getQueue(); boolean keepDelayed = getExecuteExistingDelayedTasksAfterShutdownPolicy(); boolean keepPeriodic = getContinueExistingPeriodicTasksAfterShutdownPolicy(); for (Object e : q.toArray()) { if (e instanceof RunnableScheduledFuture) { RunnableScheduledFuture<?> t = (RunnableScheduledFuture<?>)e; if ((t.isPeriodic() ? !keepPeriodic : (!keepDelayed && t.getDelay(NANOSECONDS) > 0)) || t.isCancelled()) { if (q.remove(t)) t.cancel(false); } } } tryTerminate(); }
这个方法是一个钩子方法,主要用于
ScheduledThreadPoolExecutor
。它根据线程池的关闭策略,处理队列中的计划任务。如果策略决定取消某些任务,那么这些任务将被取消。tryTerminate() 方法:
查看代码
javafinal void tryTerminate() { for (;;) { int c = ctl.get(); if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateLessThan(c, STOP) && ! workQueue.isEmpty())) return; if (workerCountOf(c) != 0) { // Eligible to terminate interruptIdleWorkers(ONLY_ONE); return; } final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { try { terminated(); } finally { ctl.set(ctlOf(TERMINATED, 0)); termination.signalAll(); } return; } } finally { mainLock.unlock(); } // else retry on failed CAS } }
tryTerminate()
方法尝试将线程池的状态从SHUTDOWN
转换到TIDYING
,然后到TERMINATED
。- 检查当前状态:方法首先检查线程池的当前状态,如果线程池还在运行 (
isRunning(c)
),或者已经处于TIDYING
或TERMINATED
状态,或者线程池处于SHUTDOWN
状态但队列中还有任务 (! workQueue.isEmpty()
),则方法直接返回,不进行终止。 - 中断空闲线程:如果工作线程数不为零 (
workerCountOf(c) != 0
),意味着有线程可能在执行任务,方法会中断空闲线程(interruptIdleWorkers(ONLY_ONE)
),然后返回。 - 设置 TIDYING 状态:如果上述条件都不满足,即没有运行中的线程且队列中没有任务,方法会尝试将线程池状态设置为
TIDYING
。这通过compareAndSet
方法原子性地完成。 - 执行 terminated() 方法:如果状态成功设置为
TIDYING
,方法将调用terminated()
方法,这是一个钩子方法,可以被子类覆盖以进行清理工作。 - 设置 TERMINATED 状态:在
terminated()
方法执行完毕后,线程池的状态被设置为TERMINATED
,并通知所有等待线程池终止的线程(termination.signalAll()
)。 - 解锁:最后,释放
mainLock
锁。 - 重试:如果
compareAndSet
操作失败,循环将重试,直到成功。
- 检查当前状态:方法首先检查线程池的当前状态,如果线程池还在运行 (
整个 shutdown()
方法的执行流程可以总结为:
- 锁定主锁以保护状态变更。
- 检查关闭权限。
- 设置线程池状态为
SHUTDOWN
。 - 中断所有空闲的工作线程。
- 对于
ScheduledThreadPoolExecutor
,处理计划任务。 - 尝试终止线程池,这包括将状态设置为
TIDYING
,执行终止钩子方法,然后将状态设置为TERMINATED
。 - 解锁并通知所有等待线程池终止的线程。
shutdownNow()
方法
shutdownNow()
方法尝试立即停止所有正在执行的任务,并返回尚未执行的任务列表的方法。
shutdownNow() 方法:
查看代码
javapublic List<Runnable> shutdownNow() { List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(STOP); interruptWorkers(); tasks = drainQueue(); } finally { mainLock.unlock(); } tryTerminate(); return tasks; }
这是
shutdownNow()
方法的入口点。它首先获取一个全局锁mainLock
,然后执行以下步骤:checkShutdownAccess() 方法:
查看代码
javaprivate void checkShutdownAccess() { // assert mainLock.isHeldByCurrentThread(); @SuppressWarnings("removal") SecurityManager security = System.getSecurityManager(); if (security != null) { security.checkPermission(shutdownPerm); for (Worker w : workers) security.checkAccess(w.thread); } }
这个方法检查当前线程是否有权限关闭线程池。如果有安全管理器,它会检查关闭线程池的权限,并对每个工作线程执行访问检查。
advanceRunState(int targetState) 方法:
javaprivate void advanceRunState(int targetState) { // assert targetState == SHUTDOWN || targetState == STOP; for (;;) { int c = ctl.get(); if (runStateAtLeast(c, targetState) || ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c)))) break; } }
这个方法将线程池的运行状态设置为
STOP
。它使用一个循环和compareAndSet
原子操作来保证状态更新的原子性。interruptWorkers() 方法:
javaprivate void interruptWorkers() { // assert mainLock.isHeldByCurrentThread(); for (Worker w : workers) w.interruptIfStarted(); }
这个方法遍历所有工作线程并调用它们的
interruptIfStarted()
方法,中断那些已经启动的线程。interruptIfStarted() 方法:
javavoid interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } }
如果工作线程的状态大于等于0(表示已启动),并且线程不为空且未被中断,则中断该线程。
drainQueue() 方法:
查看代码
javaprivate List<Runnable> drainQueue() { BlockingQueue<Runnable> q = workQueue; ArrayList<Runnable> taskList = new ArrayList<>(); q.drainTo(taskList); if (!q.isEmpty()) { for (Runnable r : q.toArray(new Runnable[0])) { if (q.remove(r)) taskList.add(r); } } return taskList; }
这个方法排空任务队列,将所有未执行的任务移除并添加到列表
taskList
中。首先使用drainTo()
尽可能多地移除任务,如果队列仍然不为空,则通过遍历队列快照并逐个移除剩余的任务。释放锁:
javamainLock.unlock();
在执行完所有关闭操作后,释放
mainLock
锁。tryTerminate() 方法:
查看代码
javafinal void tryTerminate() { for (;;) { int c = ctl.get(); if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateLessThan(c, STOP) && ! workQueue.isEmpty())) return; if (workerCountOf(c) != 0) { // Eligible to terminate interruptIdleWorkers(ONLY_ONE); return; } final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { try { terminated(); } finally { ctl.set(ctlOf(TERMINATED, 0)); termination.signalAll(); } return; } } finally { mainLock.unlock(); } // else retry on failed CAS } }
这个方法尝试将线程池的状态从
STOP
转换为TIDYING
,然后到TERMINATED
。如果线程池的状态至少是TIDYING
或者当前有任务在工作,则不进行终止。否则,如果所有任务都已执行完毕且没有工作线程,则设置线程池的状态为TIDYING
,然后调用terminated()
方法,最后将状态设置为TERMINATED
并通知所有等待线程池终止的线程。返回未执行的任务列表:
javareturn tasks;
最后,方法返回从任务队列中排空出来的尚未执行的任务列表。
总结起来,shutdownNow()
方法的执行流程可以总结为:
- 锁定主锁以保护状态变更。
- 检查关闭权限。
- 设置线程池状态为
STOP
。 - 中断所有工作线程。
- 排空任务队列。
- 解锁。
- 尝试终止线程池。
- 返回未执行的任务列表。
两种关闭线程池方法的区别
shutdown()
和shutdownNow()
方法在ThreadPoolExecutor
类中的实现有明显的不同。
查看代码
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(SHUTDOWN);
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP);
interruptWorkers();
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
- 设置线程池状态:
shutdown()
调用advanceRunState(SHUTDOWN)
,将线程池状态设置为SHUTDOWN
。shutdownNow()
调用advanceRunState(STOP)
,将线程池状态设置为STOP
。
- 中断工作线程:
shutdown()
调用interruptIdleWorkers()
,只中断那些处于空闲状态的工作线程(等待任务的线程)。shutdownNow()
调用interruptWorkers()
,中断所有工作线程,无论它们是否空闲。
- 处理任务队列:
shutdown()
不直接处理任务队列中的任务,允许它们继续执行。shutdownNow()
调用drainQueue()
,排空任务队列,并返回尚未执行的任务列表。
- 返回值:
shutdown()
没有返回值。shutdownNow()
返回一个List<Runnable>
,包含所有从任务队列中排空出来的尚未执行的任务。
onShutdown()
钩子方法:shutdown()
调用onShutdown()
,这是ScheduledThreadPoolExecutor
的一个钩子方法,用于处理关闭时计划任务的行为。shutdownNow()
不调用onShutdown()
。
tryTerminate()
:- 两个方法最后都调用
tryTerminate()
,尝试终止线程池。但由于shutdownNow()
设置了更严格的STOP
状态并中断所有工作线程,它可能导致线程池更快地进入终止状态。
- 两个方法最后都调用
通过这些源码层面的差异,我们可以看到shutdownNow()
比shutdown()
更激进,它试图立即停止所有任务,而shutdown()
则允许线程池中的任务完成执行。这些方法的选择取决于应用程序的需求和对任务执行完整性的要求。
execute
方法
execute
方法是用来提交任务以供将来执行的。下面是execute
方法的执行流程:
- 参数校验:
- 首先检查传入的
Runnable
任务是否为null
,如果是null
,则直接抛出NullPointerException
。
- 首先检查传入的
- 尝试添加核心线程:
- 通过
ctl.get()
获取当前线程池的控制状态。 - 检查当前运行的线程数是否小于
corePoolSize
(核心线程数)。 - 如果小于核心线程数,则尝试通过
addWorker
方法创建一个新的核心线程来执行任务。如果成功,则方法返回,任务执行流程结束。
- 通过
- 尝试将任务加入队列:
- 如果当前线程池状态为运行中(
isRunning(c)
),并且任务成功加入工作队列(workQueue.offer(command)
):- 重新检查线程池的状态,因为可能在线程入队后状态发生了变化。
- 如果线程池不再运行,并且成功从队列中移除任务(
remove(command)
),则通过reject
方法处理该任务。 - 如果在重新检查时发现没有运行的线程(可能之前的线程已经终止),则会尝试添加一个新的非核心线程来确保有线程可以处理队列中的任务。
- 如果当前线程池状态为运行中(
- 尝试添加非核心线程:
- 如果任务不能加入队列,或者队列已满,则尝试通过
addWorker
方法创建一个新的非核心线程来执行任务。 - 如果创建失败(可能是因为线程池已经关闭或者达到了最大线程数
maximumPoolSize
),则通过reject
方法处理该任务。
- 如果任务不能加入队列,或者队列已满,则尝试通过
查看代码
/**
* 提交任务以供将来执行。任务可能会在一个新的线程中执行,或者在一个现有的线程池线程中执行。
* @param command 要执行的任务
*/
public void execute(Runnable command) {
// 检查任务是否为null,如果是则抛出NullPointerException
if (command == null)
throw new NullPointerException();
// 执行流程分为三个步骤:
// 步骤1:如果当前运行的线程数小于核心线程数,尝试启动一个新的线程来执行任务
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
// 尝试添加一个核心线程执行任务,如果成功则直接返回
if (addWorker(command, true))
return;
// 如果添加失败,重新获取线程池状态
c = ctl.get();
}
// 步骤2:如果任务可以成功加入队列,仍然需要双重检查是否应该添加线程
if (isRunning(c) && workQueue.offer(command)) {
// 重新检查线程池状态
int recheck = ctl.get();
// 如果线程池不再运行且任务从队列中移除成功,则拒绝任务
if (! isRunning(recheck) && remove(command))
reject(command);
// 如果工作线程数为0,则添加一个非核心线程
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 步骤3:如果任务无法加入队列,尝试添加一个新的非核心线程
else if (!addWorker(command, false))
// 如果添加非核心线程失败,则拒绝任务
reject(command);
}
addWorker
方法源码
/**
* 尝试添加一个新的工作线程来执行任务。这个方法会根据线程池的当前状态和配置来决定是否添加线程。
*
* @param firstTask 要执行的任务,如果没有则为null
* @param core 是否是核心线程
* @return 如果成功添加了工作线程则返回true,否则返回false
*/
private boolean addWorker(Runnable firstTask, boolean core) {
// 使用标签retry来简化在循环中重新检查状态和重试的逻辑
retry:
for (int c = ctl.get();;) {
// 只有在必要时才检查队列是否为空
// 如果线程池至少处于SHUTDOWN状态,并且
// 1) 至少处于STOP状态
// 2) 任务不为null
// 3) 工作队列为空
// 则不添加工作线程
if (runStateAtLeast(c, SHUTDOWN)
&& (runStateAtLeast(c, STOP)
|| firstTask != null
|| workQueue.isEmpty()))
return false;
// 无限循环,直到成功增加工作线程计数或确定不添加工作线程
for (;;) {
// 检查当前工作线程数是否超过了核心线程数或最大线程数
if (workerCountOf(c)
>= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
return false;
// 尝试增加工作线程计数
if (compareAndIncrementWorkerCount(c))
break retry; // 成功增加计数,跳出retry标签的循环
// 重新读取ctl,因为可能其他线程改变了它的值
c = ctl.get();
// 如果线程池至少处于SHUTDOWN状态,则继续外层循环
if (runStateAtLeast(c, SHUTDOWN))
continue retry;
// 如果CAS操作失败,是因为工作线程计数变化,继续内层循环
}
}
// 标记工作线程是否已启动和添加
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 创建一个新的Worker实例,它包含要执行的任务
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
// 获取主锁,以便在添加工作线程时保持同步
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 在持有锁的同时重新检查状态
// 如果线程工厂失败或者在获取锁之前关闭,则回滚操作
int c = ctl.get();
// 检查线程池是否正在运行,或者在STOP以下状态且任务为null
if (isRunning(c) ||
(runStateLessThan(c, STOP) && firstTask == null)) {
// 检查线程是否处于NEW状态
if (t.getState() != Thread.State.NEW)
throw new IllegalThreadStateException();
// 将Worker添加到工作线程集合中
workers.add(w);
workerAdded = true;
// 更新largestPoolSize
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
}
} finally {
// 释放锁
mainLock.unlock();
}
// 如果Worker已添加到集合中,则启动线程
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
// 如果工作线程没有启动,则进行清理
if (! workerStarted)
addWorkerFailed(w);
}
// 返回工作线程是否已启动的标志
return workerStarted;
}
runWorker
方法
runWorker
方法是Java线程池中ThreadPoolExecutor
类的一个关键方法,它负责管理Worker
线程的整个生命周期,包括执行任务、处理异常、以及线程的退出。
以下是runWorker
方法的主要步骤:
- 初始化:
- 方法接收一个
Worker
对象w
作为参数。 - 获取当前线程
wt
,它是Worker
对象内部创建的线程。 - 将
Worker
的firstTask
字段赋值给局部变量task
,并将Worker
的firstTask
字段设置为null
,这样Worker
就不会再持有初始任务的引用。 - 调用
w.unlock()
,允许线程响应中断。
- 方法接收一个
- 循环执行任务:
- 进入一个循环,只要
task
不为null
或者从任务队列getTask()
中能够获取到任务,循环就会继续。 - 在循环体内,首先对
Worker
加锁,以防止在执行任务期间线程被中断。 - 检查线程池的状态,如果线程池正在停止,则确保当前线程被中断;如果线程池没有停止,则确保当前线程不被中断。
- 进入一个循环,只要
- 执行任务前后的钩子方法:
- 在执行任务之前,调用
beforeExecute
方法,这个方法可以被子类重写以执行一些预处理操作。 - 执行任务
task.run()
。 - 在任务执行完毕后,调用
afterExecute
方法,这个方法也可以被子类重写以执行一些后续处理操作。
- 在执行任务之前,调用
- 异常处理:
- 如果任务执行过程中抛出异常,会捕获这个异常,并在
finally
块中调用afterExecute
方法。 - 将异常再次抛出,这样会结束当前
Worker
的执行。
- 如果任务执行过程中抛出异常,会捕获这个异常,并在
- 任务执行完毕:
- 在
finally
块中,将task
设置为null
,增加Worker
完成的任务数completedTasks
,并释放Worker
的锁。
- 在
- 线程退出:
- 如果
getTask()
返回null
,循环结束,表示Worker
应该退出。 - 将
completedAbruptly
设置为false
,表示线程正常退出。 - 在
finally
块中调用processWorkerExit
方法来处理Worker
的退出,包括可能的线程池状态更新和线程的清理工作。
- 如果
查看代码
/**
* 执行工作线程的主循环。重复地从队列中获取任务并执行,同时处理以下问题:
*
* 1. 我们可能从初始任务开始,这种情况下我们不需要从队列中获取第一个任务。
* 否则,只要线程池在运行,就从getTask方法中获取任务。如果返回null,
* 则工作线程由于池状态或配置参数的改变而退出。其他退出情况是由于外部代码抛出的异常,
* 在这种情况下completedAbruptly为true,通常会触发processWorkerExit方法来替换这个线程。
*
* 2. 在执行任何任务之前,获取锁以防止任务执行期间其他池中断,
* 然后确保除非池正在停止,否则此线程的中断状态不会被设置。
*
* 3. 每个任务运行前都会调用beforeExecute方法,该方法可能会抛出异常,
* 在这种情况下,我们会让线程死亡(通过设置completedAbruptly为true来中断循环),
* 而不处理任务。
*
* 4. 假设beforeExecute正常完成,我们运行任务,收集它抛出的任何异常并发送到afterExecute。
* 我们分别处理RuntimeException和Error(规范保证我们会捕获它们)以及任意的Throwables。
* 因为在Runnable.run中我们不能重新抛出Throwables,所以我们将它们包装在Errors中传递出去(到线程的UncaughtExceptionHandler)。
* 任何抛出的异常也会保守地导致线程死亡。
*
* 5. 在task.run完成后,我们调用afterExecute,它也可能抛出异常,这将同样导致线程死亡。
* 根据JLS Sec 14.20,即使task.run抛出异常,这个异常也会生效。
*
* 异常处理机制的净效果是,afterExecute和线程的UncaughtExceptionHandler能够尽可能准确地获取用户代码遇到的问题信息。
*
* @param w 工作线程
*/
final void runWorker(Worker w) {
// 获取当前线程
Thread wt = Thread.currentThread();
// 获取Worker的初始任务
Runnable task = w.firstTask;
// 清除Worker的初始任务引用
w.firstTask = null;
// 允许中断
w.unlock();
// 标记线程是否因为异常而突然终止
boolean completedAbruptly = true;
try {
// 循环直到没有任务可以执行
while (task != null || (task = getTask()) != null) {
// 获取锁
w.lock();
// 如果线程池正在停止,确保线程被中断;
// 如果没有停止,确保线程不被中断。这需要重新检查以处理shutdownNow竞争和清除中断
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 在执行任务前调用钩子方法
beforeExecute(wt, task);
try {
// 执行任务
task.run();
// 在任务执行后调用钩子方法
afterExecute(task, null);
} catch (Throwable ex) {
// 在捕获异常后调用钩子方法
afterExecute(task, ex);
// 抛出异常,将导致线程终止
throw ex;
}
} finally {
// 清除任务引用
task = null;
// 增加完成的任务数
w.completedTasks++;
// 释放锁
w.unlock();
}
}
// 标记线程正常终止
completedAbruptly = false;
} finally {
// 处理工作线程退出
processWorkerExit(w, completedAbruptly);
}
}
getTask()
源码
/**
* 尝试从工作队列中获取任务,如果满足特定条件,则返回null,指示工作线程应该退出。
*
* @return 获取到的任务,或者null(如果工作线程需要退出)
*/
private Runnable getTask() {
// 标记上一次poll操作是否超时
boolean timedOut = false;
for (;;) {
// 获取当前线程池的控制状态
int c = ctl.get();
// 如果线程池已经关闭,并且队列中没有任务,则减少工作线程数量并返回null
if (runStateAtLeast(c, SHUTDOWN)
&& (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
// 获取当前工作线程的数量
int wc = workerCountOf(c);
// 判断工作线程是否有可能被回收
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 如果工作线程数量超过最大线程数,或者线程等待任务超时且允许回收线程,
// 并且工作线程数量大于1或者队列为空,则尝试减少工作线程数量
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
// 使用CAS操作减少工作线程数量,如果成功则返回null
if (compareAndDecrementWorkerCount(c))
return null;
// 如果减少工作线程数量失败,则继续循环
continue;
}
try {
// 如果允许回收线程,则使用带超时的poll方法获取任务,否则使用take方法阻塞等待任务
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : // 带超时的poll方法
workQueue.take(); // 阻塞等待任务
if (r != null)
return r; // 如果获取到任务,则返回任务
timedOut = true; // 如果没有获取到任务,标记为超时
} catch (InterruptedException retry) {
// 如果在等待任务时被中断,则重置超时标记并继续循环
timedOut = false;
}
}
}
processWorkerExit
源码
/**
* 执行清理和记录工作线程死亡的工作。仅在工作线程中调用。
* 除非completedAbruptly被设置,否则假设workerCount已经根据退出进行了调整。
* 这个方法从worker集合中移除线程,并且如果线程因用户任务异常而退出,
* 或者运行的线程少于corePoolSize,或者队列非空但没有线程,则可能终止线程池或替换工作线程。
*
* @param w 死亡的Worker对象
* @param completedAbruptly 如果Worker因用户异常而死亡,则为true
*/
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) { // 如果突然终止,则workerCount尚未调整
decrementWorkerCount(); // 减少workerCount
}
final ReentrantLock mainLock = this.mainLock; // 获取主线程锁
mainLock.lock(); // 锁定主线程锁
try {
completedTaskCount += w.completedTasks; // 增加完成的任务数
workers.remove(w); // 从workers集合中移除w
} finally {
mainLock.unlock(); // 释放主线程锁
}
tryTerminate(); // 尝试终止线程池
int c = ctl.get(); // 获取ctl的状态
if (runStateLessThan(c, STOP)) { // 如果线程池状态小于STOP
if (!completedAbruptly) { // 如果不是突然终止
int min = allowCoreThreadTimeOut ? 0 : corePoolSize; // 计算最小线程数
if (min == 0 && ! workQueue.isEmpty()) // 如果最小线程数为0且任务队列非空
min = 1; // 最小线程数为1
if (workerCountOf(c) >= min) // 如果当前工作线程数大于等于最小线程数
return; // 无需替换
}
addWorker(null, false); // 尝试添加一个新的工作线程
}
}
ExecutorCompletionService
ExecutorCompletionService
是 Java 并发工具包中的一个类,它将 Executor
和 CompletionService
的功能结合起来,提供了一个用于处理异步任务的解决方案。
类的作用
ExecutorCompletionService
的主要作用是简化异步任务的处理流程。当我们有一批任务需要通过 Executor
异步执行时,这个类可以帮助我们收集这些任务的执行结果,而不需要手动管理这些任务的未来(Future
)对象。
关键特性
- 基于
Executor
:ExecutorCompletionService
需要一个Executor
来实际执行任务。 - 结果队列:任务完成时,其结果(
Future
对象)会被放入一个队列中。这使得可以按任务完成的顺序来处理结果,而不是按照任务提交的顺序。 - 轻量级:适用于临时使用,特别是在处理任务组时。
构造函数
ExecutorCompletionService(Executor executor)
:使用指定的Executor
并创建一个默认的LinkedBlockingQueue
作为结果队列。ExecutorCompletionService(Executor executor, BlockingQueue<Future<V>> completionQueue)
:使用指定的Executor
和提供的队列作为结果队列。
重要方法
submit(Callable<V> task)
:提交一个Callable
任务以供执行。submit(Runnable task, V result)
:提交一个Runnable
任务和其结果以供执行。take()
:从结果队列中取出并移除下一个已完成任务的结果,如果目前没有已完成的任务,则阻塞等待。poll()
:从结果队列中取出并移除下一个已完成任务的结果,如果没有则返回null
。poll(long timeout, TimeUnit unit)
:从结果队列中取出并移除下一个已完成任务的结果,如果在指定等待时间内没有已完成的任务,则返回null
。
内部类
QueueingFuture
:一个FutureTask
的子类,当任务完成时,它将任务的结果Future
放入结果队列。
属性
private final Executor executor;
- 用途:这个属性持有构造函数中传入的
Executor
对象的引用,用于执行提交的任务。
- 用途:这个属性持有构造函数中传入的
private final AbstractExecutorService aes;
- 用途:如果传入的
Executor
实际上是AbstractExecutorService
的一个实例,那么这个属性将持有它的引用。AbstractExecutorService
提供了一些额外的功能,比如能够创建RunnableFuture
实例。如果Executor
不是AbstractExecutorService
的实例,这个属性将被设置为null
。
- 用途:如果传入的
private final BlockingQueue<Future<V>> completionQueue;
- 用途:这个属性是一个阻塞队列,用于存放已完成任务的结果(即
Future
对象)。当任务完成时,它的Future
会被放入这个队列中,以便可以通过take()
,poll()
, 或poll(long timeout, TimeUnit unit)
方法检索。
- 用途:这个属性是一个阻塞队列,用于存放已完成任务的结果(即
使用场景
- 处理多个异步任务的结果:例如,我们可能有一组并行的计算任务,我们希望一旦某个任务完成,就立即处理其结果,而不是等待所有任务都完成。
- 实现“最快响应”模式:在这种模式下,我们可能只对第一个完成的任务感兴趣,并希望取消其他所有未完成的任务。
示例
以下是使用 ExecutorCompletionService
的一个简单示例:
查看代码
ExecutorService executor = Executors.newFixedThreadPool(3);
CompletionService<Integer> completionService = new ExecutorCompletionService<>(executor);
// 提交任务
completionService.submit(() -> doSomeComputation(1));
completionService.submit(() -> doSomeComputation(2));
completionService.submit(() -> doSomeComputation(3));
// 处理结果
for (int i = 0; i < 3; i++) {
try {
Future<Integer> future = completionService.take();
Integer result = future.get();
System.out.println("Result: " + result);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
executor.shutdown();
在这个示例中,我们创建了一个固定大小的线程池和一个 ExecutorCompletionService
实例。我们提交了三个任务,并使用 take()
方法来按完成顺序处理结果。
参考链接: