时间轮算法
时间轮(Timing Wheel)
时间轮是一种高效的定时任务调度数据结构,核心目标是解决大规模定时任务场景下的性能瓶颈问题。最早应用于操作系统内核的定时器管理,后被广泛移植到中间件(如Kafka、Netty)中。
核心原理是模拟时钟轮盘的周期性转动,通过数组槽位 + 双向链表 + 每隔 tickDuration 时间向前移动一个槽位,实现 O(1) 复杂度的定时任务调度。
传统定时任务的问题
传统定时任务的实现方式(如基于Timer + 优先级队列)在高并发、大规模任务场景下存在明显缺陷:
-
性能瓶颈 基于优先级队列(如Java的
DelayQueue)的定时任务,每次添加/删除任务的时间复杂度为 $O(logN)$,当任务数量达到百万级甚至千万级时,频繁的队列操作会导致CPU占用率飙升,调度延迟显著增加。 同时,优先级队列需要全局遍历或排序,无法高效支持任务的批量触发和周期任务的重复调度。 -
资源浪费 简单的定时轮询方案(如每隔1ms扫描所有任务)会造成大量无效遍历,尤其当大部分任务的触发时间较远时,CPU资源被严重浪费。
典型应用场景
- 中间件:Kafka的延时消息、Netty的I/O超时管理、Redis的过期键清理;
- 分布式系统:微服务的超时重试、分布式锁的续租、任务调度框架的定时任务;
- 操作系统:内核定时器、进程调度的延时触发。
主要组成部分
- 时间轮核心(Wheel)
1.1 由一个固定长度的数组构成,数组的每个元素称为一个 Slot(槽位)。
1.2 槽位数量决定了时间轮的“周长”,单个槽位对应一个固定的时间间隔(tickDuration)。
1.3 例如:tickDuration=100ms,槽位数量=512,则时间轮转一圈的总时间为 100ms * 512 = 51.2s。
- 指针(Wheel Pointer)
2.1 一个单线程驱动的指针,每隔 tickDuration 时间向前移动一个槽位。
2.1 指针移动时,会执行当前槽位上的所有到期任务。
- 任务节点(Timeout Task)
3.1 每个定时任务被封装为 Timeout 对象,包含延迟时间、任务逻辑、所属时间轮。
3.2 任务被添加时,会根据延迟时间计算出需要被放入的目标槽位,以及需要等待的轮次(rounds)。
3.3 同一个槽位的多个任务通过双向链表串联,避免数组扩容开销。
主要工作流程
1. 任务添加
1.1 计算任务的总延迟时间与 tickDuration 的比值,得到总刻度数(ticks)。
1.2 目标槽位索引 = (当前指针位置 + ticks) % 槽位数量。
1.3 任务需要等待的轮次 = ticks / 槽位数量(若轮次 > 0,说明需要等待时间轮多转几圈才会执行)。
1.4 将任务添加到目标槽位的双向链表中。
2. 任务执行
2.1 时间轮线程每隔 tickDuration 移动一次指针。
2.2 指针到达某槽位时,遍历该槽位的任务链表:
2.3 若任务的剩余轮次 = 0,直接执行任务。
2.4 若任务的剩余轮次 > 0,将轮次减 1,等待下一轮指针到达时再判断。
3. 任务删除
3.1 由于任务存储在双向链表中,删除时只需修改链表节点的前后引用,时间复杂度为 O(1)。
3.2 已取消的任务会被标记为 cancelled,执行时会被跳过。
关键特性与底层设计思路
• O(1) 复杂度:任务的添加、删除、执行均通过数组索引和链表操作实现,无排序开销。
• 单线程驱动:避免多线程竞争,资源开销低,但耗时任务会阻塞后续任务执行。
• 精度限制:时间精度由 tickDuration 决定,任务实际执行时间与预期时间的误差不超过 tickDuration。
• 轮次机制:通过轮次实现超过时间轮一圈总时间的长延迟任务,无需扩容数组。
代码Demo
简易的单层时间轮算法:
import java.util.ArrayDeque;
import java.util.Queue;
/**
* 简易单轮时间轮(支持跨周期任务,原理与 Netty HashedWheelTimer 一致)
* 关键参数:
* - tickDuration:每个刻度的时间单位(如 100ms)
* - wheelSize:时间轮的槽数量(如 10)
* - 总周期 = tickDuration * wheelSize(如 100ms * 10 = 1000ms)
*/
public class HashedWheelTimer {
// 时间轮的槽:每个槽存储待执行的任务队列
private final Queue<TimerTask>[] wheel;
// 每个刻度的时间单位(毫秒)
private final long tickDuration;
// 时间轮的槽数量
private final int wheelSize;
// 时间轮当前指针位置
private volatile int currentIndex = 0;
// 驱动时间轮的工作线程
private final Thread workerThread;
// 用于 wait/notify 的锁对象,保证线程安全
private final Object lock = new Object();
// 时间轮运行状态标记
private volatile boolean running = true;
/**
* 任务封装类:核心包含 任务逻辑、任务名称、剩余轮次(跨周期关键)
*/
public static class TimerTask implements Runnable {
private final Runnable task;
private final String taskName;
// 剩余轮次:0 表示当前轮次执行,>0 表示需要等待对应轮数后执行
private int remainingRounds;
public TimerTask(Runnable task, String taskName, int remainingRounds) {
this.task = task;
this.taskName = taskName;
this.remainingRounds = remainingRounds;
}
@Override
public void run() {
task.run();
}
}
public HashedWheelTimer(long tickDuration, int wheelSize) {
this.tickDuration = tickDuration;
this.wheelSize = wheelSize;
// 初始化槽数组:每个槽对应一个任务队列
this.wheel = new Queue[wheelSize];
for (int i = 0; i < wheelSize; i++) {
wheel[i] = new ArrayDeque<>();
}
// 初始化并启动工作线程
this.workerThread = new Thread(this::workerLoop, "TimeWheel-Worker");
this.workerThread.setDaemon(true); // 设为守护线程,不阻塞程序退出
this.workerThread.start();
}
/**
* 工作线程核心循环:通过 wait() 实现精准刻度等待,推进指针并处理任务
*/
private void workerLoop() {
long currentTime = System.currentTimeMillis();
while (running) {
// 计算下一个刻度的截止时间
long nextTickTime = currentTime + tickDuration;
long waitTime = nextTickTime - System.currentTimeMillis();
// 等待下一个刻度,精准控制时间轮推进速度
if (waitTime > 0) {
synchronized (lock) {
try {
lock.wait(waitTime);
} catch (InterruptedException e) {
// 线程被中断时,退出循环停止时间轮
return;
}
}
}
// 处理当前指针位置槽的所有任务
processCurrentBucket();
// 移动指针:循环移动(0 -> wheelSize-1 -> 0)
currentIndex = (currentIndex + 1) % wheelSize;
currentTime = nextTickTime;
}
}
/**
* 处理当前槽的任务:核心逻辑 - 轮次计数判断(跨周期关键)
*/
private void processCurrentBucket() {
Queue<TimerTask> currentBucket = wheel[currentIndex];
// 临时队列:存放需要重新入队的任务(剩余轮次 > 0)
Queue<TimerTask> reEnqueueTasks = new ArrayDeque<>();
TimerTask task;
while ((task = currentBucket.poll()) != null) {
if (task.remainingRounds > 0) {
// 轮次未到:剩余轮次减 1,重新放入当前槽等待下一轮
task.remainingRounds--;
reEnqueueTasks.offer(task);
System.out.printf("任务 [%s] 剩余轮次 -1,当前剩余 [%d],等待下一轮%n",
task.taskName, task.remainingRounds);
} else {
// 轮次已到:执行任务
try {
task.run();
} catch (Exception e) {
e.printStackTrace();
}
}
}
// 将需要重新入队的任务放回当前槽,等待下一轮处理
while ((task = reEnqueueTasks.poll()) != null) {
currentBucket.offer(task);
}
}
/**
* 向时间轮添加延迟任务
*
* @param task 待执行的任务逻辑
* @param taskName 任务名称(用于日志输出)
* @param delay 延迟时间(毫秒)
*/
public void newTimeout(Runnable task, String taskName, long delay) {
if (delay < 0) {
throw new IllegalArgumentException("Delay must be non-negative");
}
// 1. 计算任务需要等待的总刻度数 = 延迟时间 / 每个刻度的时间单位
long totalTickCount = delay / tickDuration;
// 2. 计算剩余轮次 = 总刻度数 / 槽数量(需要完整转动的轮数)
int remainingRounds = (int) (totalTickCount / wheelSize);
// 3. 计算目标槽位 = (当前指针位置 + 总刻度数 % 槽数量) % 槽数量
int targetIndex = (currentIndex + (int) (totalTickCount % wheelSize)) % wheelSize;
// 4. 封装任务并添加到目标槽的队列中
TimerTask timerTask = new TimerTask(task, taskName, remainingRounds);
wheel[targetIndex].offer(timerTask);
System.out.printf("任务 [%s] 已添加到槽 [%d],延迟 [%dms],总刻度 [%d],剩余轮次 [%d]%n",
taskName, targetIndex, delay, totalTickCount, remainingRounds);
}
/**
* 停止时间轮:终止工作线程,释放资源
*/
public void stop() {
running = false;
// 唤醒等待的工作线程,确保正常退出
synchronized (lock) {
lock.notify();
}
try {
workerThread.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
// -------------------------- 测试主函数 --------------------------
public static void main(String[] args) throws InterruptedException {
// 初始化时间轮:每个刻度 100ms,共 10 个槽,总周期 1000ms
HashedWheelTimer timer = new HashedWheelTimer(100, 10);
// 任务1:延迟 200ms(2 刻度,0 轮次 → 直接执行)
timer.newTimeout(() -> System.out.println("任务1执行:延迟 200ms(0 轮次)"), "任务1", 200);
// 任务2:延迟 1200ms(12 刻度,1 轮次 → 等待 1 轮后执行)
timer.newTimeout(() -> System.out.println("任务2执行:延迟 1200ms(1 轮次)"), "任务2", 1200);
// 任务3:延迟 2500ms(25 刻度,2 轮次 → 等待 2 轮后执行)
timer.newTimeout(() -> System.out.println("任务3执行:延迟 2500ms(2 轮次)"), "任务3", 2500);
// 任务4:延迟 6000ms(60 刻度,6 轮次 → 等待 6 轮后执行)
timer.newTimeout(() -> System.out.println("任务4执行:延迟 6000ms(6 轮次)"), "任务4", 6000);
// 等待所有任务执行完成
Thread.sleep(10000);
// 停止时间轮
timer.stop();
}
}