Future
Future
接口位于java.util.concurrent
包内,被设计用来表示异步计算的结果,并提供了一套方法来管理和获取这些结果。
计算结果只能通过get
方法在计算完成时获取,如果需要,该方法会阻塞调用线程。
取消操作通过cancel
方法执行。接口还提供了其他方法来确定任务是正常完成还是被取消了。
一旦任务完成了计算,就不能再对其进行取消操作。Future
对象的状态变为已完成(无论是正常完成还是因为异常),之后任何取消请求都将不会成功。
主要方法
方法签名 | 描述 |
---|---|
boolean cancel(boolean mayInterruptIfRunning) | 尝试取消任务的执行。如果任务已经完成或取消,或者由于其他原因无法取消,则该方法无效。如果任务还未开始,调用cancel 后任务将不会运行。如果任务已经开始,mayInterruptIfRunning 参数将决定是否中断执行任务的线程。 |
boolean isCancelled() | 如果任务在正常完成前被取消,返回true 。 |
boolean isDone() | 如果任务已完成(无论是正常终止、异常还是取消),返回true 。 |
V get() throws InterruptedException, ExecutionException | 如果需要,等待计算完成,然后检索结果。 |
V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException | 最多等待给定时间,然后检索结果(如果可用)。 |
RunnableFuture
RunnableFuture
接口同样位于java.util.concurrent
包中,它扩展了两个接口:Runnable
和Future<V>
,用于表示一个异步计算任务,它可以在执行完成后提供结果。
主要方法
void run()
: 这个方法是Runnable
接口中定义的方法,它应该包含实现类的计算逻辑。在RunnableFuture
的上下文中,成功执行run
方法会导致Future
完成,并允许访问其结果。
FutureTask
FutureTask
类同样位于java.util.concurrent
包中,它实现了 RunnableFuture<V>
接口。
FutureTask
是一个可取消的异步计算任务。它提供了启动和取消计算、查询计算是否完成以及获取计算结果的方法。结果只能在计算完成后获取,如果计算尚未完成,get
方法将阻塞。一旦计算完成,就不能重新启动或取消计算(除非使用 runAndReset
方法)。
主要功能
执行异步计算:FutureTask
可以包装一个 Callable<V>
或 Runnable
对象,并通过 Executor
执行。
获取结果:一旦计算完成,可以通过 get
方法获取结果,该方法可能会阻塞调用线程直到结果准备好。
取消计算:可以通过 cancel
方法尝试取消正在运行的任务。
检查状态:可以检查任务是正常完成、异常结束还是被取消。
主要字段
字段名称 | 类型 | 描述 | 说明 |
---|---|---|---|
state | int | 表示任务的状态,是volatile 的以确保其对所有线程的可见性。 | 可能的值: - NEW :任务刚刚创建,尚未开始执行。- COMPLETING :任务正在执行,结果即将被设置。- NORMAL :任务已完成正常执行。- EXCEPTIONAL :任务在执行过程中抛出了异常。- CANCELLED :任务已被取消。- INTERRUPTING :任务正在被中断。- INTERRUPTED :任务已被中断。 |
callable | Callable<V> | 存储了任务执行的实际Callable 对象,提供了计算结果的方法。 | 当FutureTask 完成时,callable 将被设置为null 。 |
outcome | Object | 存储了任务的结果或异常。 | 如果任务正常完成,outcome 将包含计算的结果;如果任务抛出异常,outcome 将包含异常对象。 |
runner | Thread | 存储了正在执行callable 的线程。 | 这个字段是通过volatile 关键字保证可见性的,但在run 方法中,它可能会被多次重置为null 。 |
waiters | volatile WaitNode | 一个简单的Treiber栈(无锁并发栈),用于存储等待任务完成的线程。 | 用于实现FutureTask 的阻塞和解除阻塞机制,确保在任务完成时能够通知等待的线程。 |
构造方法
FutureTask(Callable<V> callable)
:创建一个 FutureTask
,它将在运行时执行给定的 Callable
。
FutureTask(Runnable runnable, V result)
:创建一个 FutureTask
,它将在运行时执行给定的 Runnable
,并在成功完成后返回指定的结果。
主要方法
get()
get
方法是外部接口,用于等待计算完成并获取结果。它的执行流程如下:
- 检查任务状态。如果任务已完成(状态大于
COMPLETING
),直接返回状态。 - 如果任务未完成,调用
awaitDone
方法,该方法会阻塞当前线程直到任务完成。 awaitDone
方法返回后,调用report
方法来获取结果,并返回这个结果。
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
awaitDone
awaitDone
方法是内部实现,用于等待计算完成。它的执行流程如下:
- 初始化一些变量,包括开始时间、等待节点
q
和是否已排队queued
。 - 进入一个无限循环,不断检查任务状态。
- 如果任务已完成(状态大于
COMPLETING
),移除等待节点q
(如果存在),并返回状态。 - 如果任务正在完成(状态为
COMPLETING
),进行一次线程让步。 - 如果当前线程被中断,移除等待节点
q
,并抛出InterruptedException
。 - 如果等待节点
q
不存在,创建一个新的等待节点。 - 如果等待节点
q
未排队,尝试将其添加到等待队列中。 - 如果设置了超时时间,使用
LockSupport.parkNanos
方法来阻塞当前线程。如果未设置超时时间或超时时间已过,直接返回状态。 - 如果当前线程被唤醒,重新检查任务状态。
- 如果
awaitDone
方法返回,表示任务已完成。如果任务未完成,调用removeWaiter
方法来清理等待队列,并返回状态。
查看代码
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
// 初始化开始时间变量,特殊值 0L 表示尚未进行 park 操作
long startTime = 0L; // Special value 0L means not yet parked
// 初始化等待节点 q
WaitNode q = null;
// 初始化是否已排队的标志 queued
boolean queued = false;
// 进入无限循环,等待计算完成
for (;;) {
// 获取当前任务状态
int s = state;
// 如果状态大于 COMPLETING,表示任务已完成
if (s > COMPLETING) {
// 如果等待节点 q 不为空,将其线程设置为 null
if (q != null)
q.thread = null;
// 返回当前状态
return s;
}
// 如果状态是 COMPLETING,表示任务正在完成
else if (s == COMPLETING) {
// 当前线程可能已经承诺完成,因此不应该返回空结果或抛出 InterruptedException
Thread.yield();
}
// 如果当前线程被中断,从等待队列中移除当前线程,并抛出 InterruptedException
else if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
// 如果等待节点 q 不存在,创建一个新的等待节点
else if (q == null) {
if (timed && nanos <= 0L)
// 如果设置了超时时间并且已过,或者未设置超时时间,直接返回当前状态
return s;
// 创建新的等待节点 q
q = new WaitNode();
}
// 如果等待节点 q 未排队,尝试将其添加到等待队列中
else if (!queued) {
queued = WAITERS.weakCompareAndSet(this, q.next = waiters, q);
}
// 如果设置了超时时间,使用 LockSupport.parkNanos 方法来阻塞当前线程
else if (timed) {
final long parkNanos;
if (startTime == 0L) { // 第一次调用
startTime = System.nanoTime();
if (startTime == 0L)
startTime = 1L;
parkNanos = nanos;
} else {
long elapsed = System.nanoTime() - startTime;
if (elapsed >= nanos) {
removeWaiter(q);
return state;
}
parkNanos = nanos - elapsed;
}
// 再次检查状态,以避免在 LockSupport.parkNanos 阻塞前状态发生变化
if (state < COMPLETING)
LockSupport.parkNanos(this, parkNanos);
}
// 如果未设置超时时间,使用 LockSupport.park 方法来阻塞当前线程
else
LockSupport.park(this);
}
}
awaitDone
调用的方法:
查看代码
private void removeWaiter(WaitNode node) {
// 如果等待节点不为空,设置其线程为 null
if (node != null) {
node.thread = null;
// 进入无限循环,重新检查等待队列中的节点
retry:
for (;;) { // restart on removeWaiter race
for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
s = q.next;
// 如果找到有线程的节点,将其从队列中移除
if (q.thread != null) {
pred = q;
}
// 如果找到没有线程的节点,将其从队列中移除
else if (pred != null) {
pred.next = s;
// 检查是否有竞争条件
if (pred.thread == null) // check for race
continue retry;
}
// 如果等待队列中没有其他节点,退出循环
else if (!WAITERS.compareAndSet(this, q, s))
continue retry;
}
// 退出循环
break;
}
}
}
get()
方法最后调用的方法:
查看代码
private V report(int s) throws ExecutionException {
// 获取计算的结果或异常
Object x = outcome;
// 如果状态是 NORMAL,表示计算正常完成
if (s == NORMAL) {
// 返回计算的结果
return (V)x;
}
// 如果状态是大于等于 CANCELLED,表示计算被取消了
if (s >= CANCELLED) {
// 抛出 CancellationException
throw new CancellationException();
}
// 如果状态是 EXCEPTIONAL,表示计算过程中抛出了异常
if (s == EXCEPTIONAL) {
// 抛出 ExecutionException,其中包含计算过程中抛出的异常
throw new ExecutionException((Throwable)x);
}
// 如果以上条件都不满足,则不应该执行到这里
throw new InternalError("Should not reach here");
}
run()
run
方法是实现 Runnable
接口的方法,用于执行包装的 Callable
或 Runnable
对象。
查看代码
public void run() {
// 检查状态,如果状态不是 NEW 或者不能成功设置当前线程为执行线程,则直接返回
if (state != NEW ||
!RUNNER.compareAndSet(this, null, Thread.currentThread()))
return;
try {
// 获取包装的 Callable 对象
Callable<V> c = callable;
// 如果 Callable 对象存在并且状态仍然是 NEW,则执行 call 方法
if (c != null && state == NEW) {
V result;
boolean ran;
try {
// 执行 call 方法,获取结果
result = c.call();
ran = true;
} catch (Throwable ex) {
// 如果 call 方法抛出异常,则设置异常为任务的结果
result = null;
ran = false;
setException(ex);
}
// 如果 call 方法成功执行,则通过 set 方法将计算结果设置为 call 方法的返回值
if (ran)
set(result);
}
} finally {
// 执行线程必须保持非空,直到状态被设置为已完成,以防止并发调用 run()
runner = null;
// 在设置 runner 为 null 后重新读取状态,以防止泄漏中断
int s = state;
// 如果状态是 INTERRUPTING 或 INTERRUPTED,则处理可能的取消中断
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
其他方法
方法签名 | 描述 |
---|---|
cancel(boolean mayInterruptIfRunning) | 尝试取消任务的执行。如果任务已经开始,并且 mayInterruptIfRunning 为 true ,则尝试中断执行任务的线程。 |
isCancelled() | 如果任务在正常完成前被取消,返回 true 。 |
isDone() | 如果任务已完成(无论是正常终止、异常还是取消),返回 true 。 |
get(long timeout, TimeUnit unit) | 等待最多给定时间,然后返回结果(如果可用)。如果超时,则抛出 TimeoutException 。 |
done() | 当任务转换到完成状态时调用,默认实现什么也不做,子类可以覆盖此方法以执行回调或进行簿记。 |
set(V v) | 如果任务尚未设置或取消,则将结果设置为给定值。 |
setException(Throwable t) | 如果任务尚未设置或取消,则将异常设置为给定值。 |
runAndReset() | 执行计算而不设置其结果,并尝试将任务重置为初始状态。这对于设计为多次执行的任务很有用。 |
CompletableFuture
CompletableFuture<T>
类扩展了Future
接口,并实现了CompletionStage<T>
接口。
主要属性
属性名 | 描述 |
---|---|
volatile Object result; | 这个字段用于存储CompletableFuture 的结果或者异常。它是一个volatile 类型,这意味着对它的写操作对所有线程立即可见。当CompletableFuture 完成时,这个字段会被设置。如果结果是正常的,它将直接存储计算的结果。如果结果是一个异常,它将存储一个AltResult 对象,这个对象封装了异常信息。AltResult 类是CompletableFuture 的一个静态内部类,用于封装异常或者表示null 值。 |
volatile Completion stack; | 这个字段代表了一个依赖操作的栈,称为Treiber栈。每个Completion 对象都表示一个依赖于当前CompletableFuture 的操作。当CompletableFuture 完成时,它将遍历这个栈并执行所有相关的操作。Completion 类及其子类定义了如何处理不同类型的依赖操作,例如单个输入(UniCompletion )、两个输入(BiCompletion )等。 |
static final AltResult NIL; | 这是一个静态常量,用于表示null 结果。由于result 字段不能直接存储null ,所以使用NIL 来代替。 |
private static final boolean USE_COMMON_POOL; | 这个静态常量决定了是否使用ForkJoinPool.commonPool() 作为默认执行器。如果并行度大于1,则使用commonPool 。 |
private static final Executor ASYNC_POOL; | 如果支持并行,这个属性将引用ForkJoinPool.commonPool() ,否则它将引用一个ThreadPerTaskExecutor ,即每个任务都在一个新线程中执行。 |
static final int SYNC, ASYNC, NESTED; | 这些是用于Completion.tryFire 方法的模式常量。SYNC 表示同步模式,ASYNC 表示异步模式,NESTED 表示嵌套模式。 |
private static final VarHandle RESULT, STACK, NEXT; | 这些VarHandle 实例用于原子地更新字段。它们提供了对result 、stack 以及Completion 链表中下一个元素的原子操作。 |
主要作用
- 异步任务执行:可以用来执行异步任务,而不需要阻塞调用线程。
- 结果处理:可以在异步任务完成时对结果进行处理。
- 异常处理:可以优雅地处理异步任务中出现的异常。
- 任务组合:可以将多个
CompletableFuture
实例组合起来,进行更复杂的异步流程控制。 - 任务交互:可以用于实现任务之间的交互,例如一个任务的执行依赖于另一个任务的结果。
- 支持链式调用:CompletableFuture支持链式调用,可以通过调用
thenAccept()
、thenRun()
、thenApply()
等方法来添加新的操作,并将它们链接到当前的任务上。
主要方法
方法分类 | 方法名 | 描述 |
---|---|---|
静态工厂方法 | completedFuture(U value) | 返回一个已经计算好的 CompletableFuture 。 |
runAsync(Runnable runnable) | 异步执行给定的 Runnable ,没有返回值。 | |
supplyAsync(Supplier<U> supplier) | 异步执行给定的 Supplier 函数,有返回值。 | |
allOf(CompletableFuture<?>... cfs) | 当所有的给定的 CompletableFuture 完成时,返回一个新的 CompletableFuture 。 | |
anyOf(CompletableFuture<?>... cfs) | 当任何一个给定的 CompletableFuture 完成时,返回一个新的 CompletableFuture 。 | |
结果处理方法 | thenApply(Function<? super T,? extends U> fn) | 当原始 CompletableFuture 计算完成时,使用该结果作为参数执行给定的函数,并返回一个新的 CompletableFuture 。 |
thenAccept(Consumer<? super T> action) | 当原始 CompletableFuture 计算完成时,使用该结果执行给定的动作,没有返回值。 | |
thenRun(Runnable action) | 当原始 CompletableFuture 计算完成时,执行给定的动作,没有返回值。 | |
异常处理方法 | exceptionally(Function<Throwable,? extends T> fn) | 当原始 CompletableFuture 计算异常时,使用给定的函数处理异常,并返回一个新的 CompletableFuture 。 |
handle(BiFunction<? super T, Throwable, ? extends U> fn) | 当原始 CompletableFuture 计算完成或异常时,使用给定的函数处理结果或异常,并返回一个新的 CompletableFuture 。 | |
任务组合方法 | thenCompose(Function<? super T, ? extends CompletionStage<U>> fn) | 当原始 CompletableFuture 计算完成时,将其结果作为参数传递给给定的函数,该函数返回一个新的 CompletionStage ,最后返回一个新的 CompletableFuture 。 |
thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn) | 当原始 CompletableFuture 和给定的 CompletionStage 都完成时,使用这两个结果作为参数执行给定的函数,并返回一个新的 CompletableFuture 。 | |
任务交互方法 | applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn) | 当原始 CompletableFuture 或给定的 CompletionStage 中的任意一个完成时,使用该结果执行给定的函数,并返回一个新的 CompletableFuture 。 |
使用示例
使用 CompletableFuture
实现一个线程等待多个线程执行完毕后再执行的例子
查看代码
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class CompletableFutureExample {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 创建一个 CompletableFuture 数组来存储所有的线程任务
CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> {
System.out.println("线程1开始执行");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("线程1执行完毕");
});
CompletableFuture<Void> future2 = CompletableFuture.runAsync(() -> {
System.out.println("线程2开始执行");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("线程2执行完毕");
});
CompletableFuture<Void> future3 = CompletableFuture.runAsync(() -> {
System.out.println("线程3开始执行");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("线程3执行完毕");
});
// 使用 allOf 方法等待所有的 CompletableFuture 执行完毕
CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(future1, future2, future3);
// 等待所有任务完成
combinedFuture.get();
// 所有线程执行完毕后,执行主线程的任务
System.out.println("所有线程执行完毕,主线程继续执行");
}
}