Timer
java.util.Timer
是Java标准库中的一个类,用于调度一个java.util.TimerTask
任务,以便在某个时间点或者按照一定的周期执行。它是一种比较简单的调度机制,适用于轻量级的定时任务。
内部类
TimerThread
TimerThread
是Timer
的一个内部类,它是一个线程,用于执行所有的定时任务。它有一个重要的属性newTasksMayBeScheduled
,这个标志用于指示是否可以继续安排新的任务。如果设置为false
,则TimerThread
在处理完所有已安排的任务后将会终止。
TimerThread
类的主要逻辑是不断地检查任务队列,执行已到时间的任务,重新安排重复任务的时间,并从队列中移除已取消或已执行的非重复任务。当没有更多的任务可以执行,并且Timer
对象不再被引用时,线程会优雅地终止。
run 方法
查看代码
public void run() {
try {
mainLoop();
} finally {
// Someone killed this Thread, behave as if Timer cancelled
synchronized(queue) {
newTasksMayBeScheduled = false;
queue.clear(); // Eliminate obsolete references
}
}
}
run
方法是线程的入口点,它调用mainLoop
方法来处理任务,并在方法结束时执行清理工作。如果在mainLoop
执行过程中线程被中断,则会进入finally
块,将newTasksMayBeScheduled
标志设置为false
,并清空任务队列,以确保不会有悬空引用。
mainLoop 方法
查看代码
private void mainLoop() {
while (true) {
try {
TimerTask task;
boolean taskFired;
synchronized(queue) {
// Wait for queue to become non-empty
while (queue.isEmpty() && newTasksMayBeScheduled)
queue.wait();
if (queue.isEmpty())
break; // Queue is empty and will forever remain; die
// Queue nonempty; look at first evt and do the right thing
long currentTime, executionTime;
task = queue.getMin();
synchronized(task.lock) {
if (task.state == TimerTask.CANCELLED) {
queue.removeMin();
continue; // No action required, poll queue again
}
currentTime = System.currentTimeMillis();
executionTime = task.nextExecutionTime;
if (taskFired = (executionTime<=currentTime)) {
if (task.period == 0) { // Non-repeating, remove
queue.removeMin();
task.state = TimerTask.EXECUTED;
} else { // Repeating task, reschedule
queue.rescheduleMin(
task.period<0 ? currentTime - task.period
: executionTime + task.period);
}
}
}
if (!taskFired) // Task hasn't yet fired; wait
queue.wait(executionTime - currentTime);
}
if (taskFired) // Task fired; run it, holding no locks
task.run();
} catch(InterruptedException e) {
}
}
}
mainLoop
方法是TimerThread
的核心,它不断循环,直到Timer
对象不再有活动的引用并且任务队列为空。
首先,线程会检查任务队列是否为空。如果队列为空且newTasksMayBeScheduled
为true
,则线程会等待,直到队列中有新的任务或newTasksMayBeScheduled
被设置为false
。
当队列非空时,线程取出队列中下次执行时间最紧迫的任务(getMin
方法)。
如果任务已经被取消(task.state == TimerTask.CANCELLED
),则将其从队列中移除并继续下一次循环。
如果任务的执行时间已经到达或超过当前时间,则执行任务(taskFired = true
)。
- 如果任务是非重复的(
task.period == 0
),则将其从队列中移除并设置任务状态为已执行。 - 如果任务是重复的,则根据任务的周期重新安排其执行时间。
如果任务的执行时间还未到达,则线程会等待直到任务执行时间到达。
执行任务时,不持有任何锁,以避免可能的死锁情况。
如果线程在等待过程中被中断,它将捕获InterruptedException
异常,但不会进行任何特殊处理,而是继续循环。
TaskQueue
TaskQueue
是另一个内部类,它实现了基于二叉堆的优先级队列。它存储了所有的TimerTask
对象,并根据它们的下次执行时间进行排序。
一份宝贵的二叉堆源码参考实现
class TaskQueue {
/**
* Priority queue represented as a balanced binary heap: the two children
* of queue[n] are queue[2*n] and queue[2*n+1]. The priority queue is
* ordered on the nextExecutionTime field: The TimerTask with the lowest
* nextExecutionTime is in queue[1] (assuming the queue is nonempty). For
* each node n in the heap, and each descendant of n, d,
* n.nextExecutionTime <= d.nextExecutionTime.
*/
private TimerTask[] queue = new TimerTask[128];
/**
* The number of tasks in the priority queue. (The tasks are stored in
* queue[1] up to queue[size]).
*/
private int size = 0;
/**
* Returns the number of tasks currently on the queue.
*/
int size() {
return size;
}
/**
* Adds a new task to the priority queue.
*/
void add(TimerTask task) {
// Grow backing store if necessary
if (size + 1 == queue.length)
queue = Arrays.copyOf(queue, 2*queue.length);
queue[++size] = task;
fixUp(size);
}
/**
* Return the "head task" of the priority queue. (The head task is an
* task with the lowest nextExecutionTime.)
*/
TimerTask getMin() {
return queue[1];
}
/**
* Return the ith task in the priority queue, where i ranges from 1 (the
* head task, which is returned by getMin) to the number of tasks on the
* queue, inclusive.
*/
TimerTask get(int i) {
return queue[i];
}
/**
* Remove the head task from the priority queue.
*/
void removeMin() {
queue[1] = queue[size];
queue[size--] = null; // Drop extra reference to prevent memory leak
fixDown(1);
}
/**
* Removes the ith element from queue without regard for maintaining
* the heap invariant. Recall that queue is one-based, so
* 1 <= i <= size.
*/
void quickRemove(int i) {
assert i <= size;
queue[i] = queue[size];
queue[size--] = null; // Drop extra ref to prevent memory leak
}
/**
* Sets the nextExecutionTime associated with the head task to the
* specified value, and adjusts priority queue accordingly.
*/
void rescheduleMin(long newTime) {
queue[1].nextExecutionTime = newTime;
fixDown(1);
}
/**
* Returns true if the priority queue contains no elements.
*/
boolean isEmpty() {
return size==0;
}
/**
* Removes all elements from the priority queue.
*/
void clear() {
// Null out task references to prevent memory leak
for (int i=1; i<=size; i++)
queue[i] = null;
size = 0;
}
/**
* Establishes the heap invariant (described above) assuming the heap
* satisfies the invariant except possibly for the leaf-node indexed by k
* (which may have a nextExecutionTime less than its parent's).
*
* This method functions by "promoting" queue[k] up the hierarchy
* (by swapping it with its parent) repeatedly until queue[k]'s
* nextExecutionTime is greater than or equal to that of its parent.
*/
private void fixUp(int k) {
while (k > 1) {
int j = k >> 1;
if (queue[j].nextExecutionTime <= queue[k].nextExecutionTime)
break;
TimerTask tmp = queue[j]; queue[j] = queue[k]; queue[k] = tmp;
k = j;
}
}
/**
* Establishes the heap invariant (described above) in the subtree
* rooted at k, which is assumed to satisfy the heap invariant except
* possibly for node k itself (which may have a nextExecutionTime greater
* than its children's).
*
* This method functions by "demoting" queue[k] down the hierarchy
* (by swapping it with its smaller child) repeatedly until queue[k]'s
* nextExecutionTime is less than or equal to those of its children.
*/
private void fixDown(int k) {
int j;
while ((j = k << 1) <= size && j > 0) {
if (j < size &&
queue[j].nextExecutionTime > queue[j+1].nextExecutionTime)
j++; // j indexes smallest kid
if (queue[k].nextExecutionTime <= queue[j].nextExecutionTime)
break;
TimerTask tmp = queue[j]; queue[j] = queue[k]; queue[k] = tmp;
k = j;
}
}
/**
* Establishes the heap invariant (described above) in the entire tree,
* assuming nothing about the order of the elements prior to the call.
*/
void heapify() {
for (int i = size/2; i >= 1; i--)
fixDown(i);
}
}
主要方法:
add(TimerTask task)
: 添加一个任务到队列中。getMin()
: 获取并返回队列中下次执行时间最紧迫的任务。removeMin()
: 移除并返回队列中下次执行时间最紧迫的任务。rescheduleMin(long newTime)
: 重新安排队列中最紧迫任务的执行时间。
ThreadReaper
ThreadReaper
是Timer
的一个内部类,实现了Runnable
接口。它被用于在Timer
对象没有任何活动的引用并且任务队列为空时,优雅地终止TimerThread
。 主要方法:
run()
: 更新队列状态并通知TimerThread
。
Timer
属性
queue
: 类型为TaskQueue
,是Timer
的核心,用于存储和管理所有的TimerTask
。thread
: 类型为TimerThread
,是执行任务的线程。cleanup
: 类型为Cleanable
,用于在Timer
对象不再被使用时进行清理。
主要组件
Timer
类:这是主要的调度类,负责管理调度任务。TimerTask
抽象类:这是一个实现了Runnable
接口的抽象类,代表一个可以被调度的任务。
主要方法
schedule(TimerTask task, long delay)
:安排任务task在延迟delay毫秒后执行一次。schedule(TimerTask task, Date time)
:安排任务task在指定的时间点time执行一次。schedule(TimerTask task, long delay, long period)
:安排任务task在延迟delay毫秒后开始执行,然后每隔period毫秒重复执行一次。schedule(TimerTask task, Date firstTime, long period)
:安排任务task在指定的时间点firstTime开始执行,然后每隔period毫秒重复执行一次。scheduleAtFixedRate(TimerTask task, long delay, long period)
:安排任务task在延迟delay毫秒后开始执行,然后每隔period毫秒重复执行一次。这个方法的特点是尽可能保持任务的执行频率,如果任务执行时间过长导致下一次执行时间被延迟,它会尝试缩短后续的执行间隔来尽量弥补时间差。scheduleAtFixedRate(TimerTask task, Date firstTime, long period)
:与scheduleAtFixedRate(TimerTask task, long delay, long period)
类似,只是指定了第一次执行的时间点。cancel()
:取消Timer
中的所有已安排的任务,并且释放所有资源。当前正在执行的任务不受影响,会继续执行完毕。
注意事项
Timer
只有一个线程来执行所有的TimerTask
,这意味着如果有多个任务,它们将顺序执行,前一个任务完成之后,下一个任务才开始执行。如果一个任务的执行时间过长,它会阻塞其他任务的执行。- 如果在
TimerTask
的run
方法中抛出了未捕获的异常,Timer
将停止所有任务的执行。 Timer
不会捕获TimerTask
中的异常,因此需要确保TimerTask
中的代码能够正确处理异常,避免影响其他任务的执行。
示例
查看代码
import java.util.Timer;
import java.util.TimerTask;
public class TimerExample {
public static void main(String[] args) {
Timer timer = new Timer();
TimerTask task = new TimerTask() {
@Override
public void run() {
System.out.println("Task is running...");
}
};
timer.schedule(task, 2000, 1000); // 2秒后开始执行,然后每1秒执行一次
}
}
在上面的示例中,我们创建了一个Timer
对象和一个TimerTask
对象。然后我们使用Timer
的schedule
方法来安排这个任务在2秒后开始执行,之后每秒钟执行一次。
优点:
- 简单易用:适用于简单的定时任务需求。
- 轻量级:不需要依赖其他库或框架。
缺点:
- 单线程:所有任务都在同一个线程中执行,如果一个任务执行时间过长,会阻塞其他任务的执行。
- 异常处理:不会捕获
TimerTask
中的异常,需要自己处理异常。 - 不支持并发:不适用于需要多线程执行的任务。
ScheduledExecutorService
ScheduledExecutorService
是Java 5引入的一个接口,它是ExecutorService
的子接口,用于调度任务在未来某个时间点执行一次或者定期执行。它比java.util.Timer
更加强大和灵活,因为它可以使用多个线程来执行任务,并且能够处理异常。
主要特点
- 多线程执行:
ScheduledExecutorService
可以在多个线程中执行任务,这意味着即使一个任务长时间运行,它也不会阻塞其他任务的执行。 - 任务调度:可以调度任务在未来某个时间点执行一次,也可以定期执行。
- 异常处理:
ScheduledExecutorService
会捕获任务执行中的异常,并且可以通过返回的Future
对象来获取异常信息。 - 灵活性强:可以通过
ScheduledExecutorService
来创建不同类型的调度任务,如固定延迟执行、固定频率执行等。
主要方法
schedule(Runnable command, long delay, TimeUnit unit)
:安排一个Runnable任务在延迟delay时间单位后执行一次。schedule(Callable<V> callable, long delay, TimeUnit unit)
:安排一个Callable任务在延迟delay时间单位后执行一次,并返回一个Future对象,可以通过该对象获取任务的结果或者检查任务是否完成。scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)
:安排一个Runnable任务在延迟initialDelay时间单位后开始执行,然后每隔period时间单位重复执行一次。如果任务执行时间过长导致下一次执行时间被延迟,它会尝试缩短后续的执行间隔来尽量弥补时间差。scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)
:安排一个Runnable任务在延迟initialDelay时间单位后开始执行,然后在上一次任务执行完毕后延迟delay时间单位再执行下一次任务。
示例
查看代码
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class ScheduledExecutorServiceExample {
public static void main(String[] args) {
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
Runnable task = new Runnable() {
public void run() {
System.out.println("Task is running...");
}
};
scheduler.scheduleAtFixedRate(task, 0, 1, TimeUnit.SECONDS); // 0秒后开始执行,然后每1秒执行一次
}
}
在上面的示例中,我们创建了一个ScheduledExecutorService
对象,并安排了一个Runnable任务在0秒后开始执行,然后每秒钟执行一次。这个示例使用了固定频率的调度方式,如果任务执行时间过长,下一次执行可能会被延迟。
优点:
- 多线程:可以在多个线程中执行任务,避免单个任务阻塞其他任务。
- 异常处理:会捕获任务执行中的异常,并且可以通过返回的
Future
对象来获取异常信息。 - 灵活性强:支持多种调度模式,如固定延迟执行、固定频率执行等。
缺点:
- 复杂性:相对于
Timer
,ScheduledExecutorService
的使用稍微复杂一些。 - 资源消耗:多线程执行可能会增加资源的消耗。
Spring Task
Spring Task的底层原理主要基于Spring的IoC容器和AOP框架。它通过一系列的注解和接口,实现了对定时任务的配置和管理。
工作原理
配置
在Spring配置类中启用定时任务支持,通常通过添加
@EnableScheduling
注解来完成。ScheduledAnnotationBeanPostProcessor
当使用
@Scheduled
注解时,Spring会通过ScheduledAnnotationBeanPostProcessor
来处理这些注解。这个bean后处理器会在应用程序上下文初始化时扫描所有带有@Scheduled
注解的方法,并将这些方法注册为定时任务。任务调度
一旦任务被注册,Spring会根据所配置的
TaskScheduler
来调度这些任务。默认情况下,Spring使用ThreadPoolTaskScheduler
作为TaskScheduler
的实现,它可以配置一个线程池来高效地执行定时任务。执行任务
当到了预定的执行时间,
TaskScheduler
会从线程池中选择一个线程来执行注册的定时任务。如果任务执行失败,Spring默认不会自动重试任务。任务取消与终止
可以通过调用
TaskScheduler
的相关方法来取消尚未执行的任务。此外,如果一个任务正在执行过程中,无法直接取消它,但可以通过抛出异常等方式来终止任务的执行。
@EnableScheduling
注解
当我们在一个配置类上添加@EnableScheduling
注解时,它会启用Spring的定时任务功能。
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableScheduling;
@Configuration
@EnableScheduling
public class SchedulerConfig {
// 其他bean定义
}
@EnableScheduling
注解主要的作用是通过@Import
注解导入了一个名为SchedulingConfiguration
的配置类,这个配置类中定义了一个ScheduledAnnotationBeanPostProcessor
的Bean,这个bean后处理器会在应用程序上下文初始化时扫描所有带有@Scheduled
注解的方法,并将这些方法注册为定时任务。
它的构造函数中定义了ScheduledTaskRegistrar
,主要用于负责注册和取消任务。
ScheduledTaskRegistrar
类
ScheduledTaskRegistrar
类是Spring Task的核心类,它负责管理任务调度。它包含了三个主要的组件:
TaskScheduler
:负责执行任务。TriggerTask
:封装了任务和触发器的信息。CronTask
:封装了任务和cron表达式的信息。
ScheduledTaskRegistrar
通常不会直接使用,它是由Spring自动配置的。但是,我们可以自定义任务注册:
查看代码
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.scheduling.config.ScheduledTaskRegistrar;
@Configuration
@EnableScheduling
public class SchedulerConfig {
public ScheduledTaskRegistrar scheduledTaskRegistrar() {
ScheduledTaskRegistrar registrar = new ScheduledTaskRegistrar();
registrar.setTaskScheduler(taskScheduler());
return registrar;
}
public ThreadPoolTaskScheduler taskScheduler() {
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
scheduler.setPoolSize(10);
scheduler.setThreadNamePrefix("MyScheduler-");
return scheduler;
}
}
以上自定义了一个ScheduledTaskRegistrar
,并设置了一个ThreadPoolTaskScheduler
来执行任务。
TaskScheduler
接口
TaskScheduler
接口定义了任务调度的方法,如schedule
、scheduleAtFixedRate
等。Spring提供了一个默认的实现ThreadPoolTaskScheduler
,它使用了一个线程池来执行任务。我们可以在配置中定义一个TaskScheduler
类型的Bean来自定义任务调度器的行为。
ThreadPoolTaskScheduler
ThreadPoolTaskScheduler
实现了 TaskScheduler
接口,用于调度任务。
ThreadPoolTaskScheduler
类通过配置和扩展底层的 ScheduledExecutorService
,提供了更加灵活和易用的定时任务调度功能。它支持任务执行的错误处理、线程池大小配置、任务取消策略等高级特性,是 Spring 框架中实现定时任务的重要组件。
继承关系
ThreadPoolTaskScheduler
继承自ExecutorConfigurationSupport
。ExecutorConfigurationSupport
是一个抽象类,它提供了创建和管理ExecutorService
的基础功能。
内部类
DelegatingErrorHandlingCallable<V>
:这是一个内部类,用于包装Callable<V>
类型的任务,并在执行时处理异常。
属性
poolSize
:线程池的大小。removeOnCancelPolicy
:是否在任务被取消时移除任务。continueExistingPeriodicTasksAfterShutdownPolicy
:在关闭时是否继续执行周期性任务。executeExistingDelayedTasksAfterShutdownPolicy
:在关闭时是否执行剩余的延迟任务。errorHandler
:错误处理策略。clock
:用于定时任务的时钟。scheduledExecutor
:底层的ScheduledExecutorService
。listenableFutureMap
:一个映射,将ScheduledFuture<?>
与ListenableFuture<?>
关联起来。
主要方法
setPoolSize(int poolSize)
:设置线程池的大小。setRemoveOnCancelPolicy(boolean flag)
:设置任务取消时的行为。setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean flag)
:设置关闭时周期性任务的行为。setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean flag)
:设置关闭时剩余延迟任务的行为。setErrorHandler(ErrorHandler errorHandler)
:设置错误处理策略。setClock(Clock clock)
:设置用于定时任务的时钟。getScheduledExecutor()
:返回底层的ScheduledExecutorService
。schedule(Runnable task, Trigger trigger)
:根据给定的触发器调度任务。submit(Runnable task)
:提交一个任务。submit(Callable<T> task)
:提交一个可返回结果的任务。submitListenable(Runnable task)
:提交一个可以监听结果的任务。submitListenable(Callable<T> task)
:提交一个可以监听结果的可返回结果的任务。
@Scheduled
注解
当我们在一个方法上添加@Scheduled
注解时,Spring会通过AOP机制创建一个代理对象,这个代理对象会拦截方法的调用,并将方法的执行交给TaskScheduler
来调度。@Scheduled
注解中可以指定任务的调度规则,如固定的延迟、固定的频率或者cron表达式。
任务执行
当任务被调度执行时,TaskScheduler
会从线程池中获取一个线程来执行任务。如果任务执行过程中抛出了异常,Spring会将异常记录下来,但不会影响其他任务的执行。
任务取消
当Spring容器关闭时,ScheduledTaskRegistrar
会负责取消所有注册的任务,以避免在容器关闭后任务仍然在执行。
如果我们需要在运行时取消任务,可以这样做:
查看代码
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.stereotype.Component;
@Component
public class MyCancelableTask {
private final TaskScheduler taskScheduler;
private ScheduledFuture<?> future;
public MyCancelableTask(TaskScheduler taskScheduler) {
this.taskScheduler = taskScheduler;
this.future = taskScheduler.schedule(new Runnable() {
@Override
public void run() {
System.out.println("Task is running...");
}
}, new CronTrigger("0 * * * * ?"));
}
public void cancelTask() {
if (future != null) {
future.cancel(true);
}
}
}
上面我们通过ScheduledFuture
来取消一个任务。当cancelTask
方法被调用时,任务将被取消。