MQ延时消息的设计思路
我们在日常网购,下单未支付的时候通常都会有15-30分钟左右的有效期,如果超时不支付,系统会自动取消这笔订单。
那么问题来了:这个到期自动取消的动作是怎样触发的呢?
朴素想法 - DB 轮询
- 下单时,把该笔订单的 超时时间戳+下单未支付状态 记下来
- 起个后台任务轮询这些订单,判断时间是否到达超时
这种实现简单但并不优雅:可能很多都是空轮询,数据量大时对DB 也不友好。
那有没有更好的方案处理这一类延时任务的方案呢?
Redis zset 轮询
如上轮询DB 的方式,如果这类场景需求比较多,数据量又大的话对DB 并不友好。但有更适合支撑高并发读的组件如Redis。
Redis 在 zset 数据结构上,可以针对到期时间获取数据,而不像数据库这种针对整张表的遍历获取。
- DB 数据同步一份到 Redis,使用ZSet 数据结构,score 使用到期的时间戳,按时间从小到大排。
> zadd key expireTimestamp value
- 定时轮询 Redis 集合,根据当前时间获取到期数据,支持分页
> ZRANGEBYSCORE key minScore maxScore limit offset count
- 如果取出来,业务处理成功后可删除;如果暂不删除,则要控制好每次轮询的score(时间)参数。
MQ 延时消息
如上无论是通过轮询DB 还是轮询Redis,都是业务侵入式的去做这个消息是否到期的逻辑判断。
其实,延时消息这类我们可以选择使用消息中间件如Rocket MQ(Kafka 社区版不支持),当消息到期时会通知业务,这样业务侧不再需要做轮询处理了。
当然实际上,也是一个轮询,只是说轮询的动作从业务侧转移到了中间件,但有了中间件自然就可以让业务侧解放出来。
延时消息 设计思路
如上说,Kafka 不支持延时消息,那假设我们要基于Kafka 改造使其支持呢?思考一下 ^_^
当前背景
Kafka 当前实现:生产者把消息写入Broker日志,消费者会立即(暂且可以这样理解)拉取到。
思路1
一个比较直接的想法,针对延时消息,在生产者和Broker之间再加一层服务(Delay Server),拦截一道。
-
先改造业务侧生产者(或者在生产者后边加一层代理服务),针对延时消息,组装好数据后,发送给Delay Server,而不再发给Kafka Broker
消息要带上 原topic、过期时间
-
Delay Server 把消息先存起来。意味着需要一个存储组件(就如 Redis)
-
一个轮询服务,找出到期的消息
-
Delay Server 集成上生产者客户端,把这些到期的消息再发给目标topic,等消费者消费即可
思路1 图示
思路2
注意到思路1中,延时消息是直接从业务侧的生产端发到了Delay Server,这就要求Delay Server至少有高并发的写入能力(因为替代了MQ写)。
那怎么更好解决这个问题呢?其实就是仍然先把消息写到Broker日志中,如先存到一个特别的topic。再让Delay Server做消费、存储、发送给原topic。
-
先改造业务侧生产者,针对延时消息,把消息发到一个特别的topic
消息仍要带上 原topic、过期时间
-
Delay Server 集成消费者,拉取这个topic数据,把消息存储到存储组件
-
一个轮询服务,找出到期的消息
-
Delay Server 集成上生产者,把这些到期的消息再发给目标topic,等消费者消费即可
思路2 图示
DDMQ 实现方式
DDMQ (滴滴基于Rocket MQ)就是采用了思路2的设计方式,我们看下架构图:
DDMQ 架构图
步骤
- DDMQ 的生产 SDK 将延迟消息生产到 PProxy 中
- PProxy 会将延迟消息写入到提前配置好的 inner topic 中。
- Chronos(delay server)消费 inner topic 的消息并存储到内置的 RocksDB 存储引擎中。
- Chronos 内置的时间轮服务会将到期的消息再次发送给 DDMQ (原topic) 供业务方消费。
RocketMQ 实现方式
RocketMQ 自身支持延时消息投递,延时服务原生在Broker。
RocketMQ 不支持秒级精度,默认支持18个level的延迟消息。Broker在启动时,内部会创建一个内部主题:SCHEDULE_TOPIC_XXXX,根据延迟level的个数,创建对应数量的队列,也就是说18个level对应了18个队列(每个Broker节点都有18个队列)。
这个内部主题 SCHEDULE_TOPIC_XXXX 就是思路2中先暂存的针对延时消息的特别的topic。
步骤
- 生产者发延时消息
- broker内部针对延时消息,修改生产者中消息topic名称和队列信息,转发消息到延迟主题 SCHEDULE_TOPIC_XXXX 的CosumeQueue中
- 延迟服务 消费SCHEDULE_TOPIC_XXXX消息,将信息重新存储到CommitLog中
- 将到期消息投递到目标topic中,消费者可见
整体步骤其实和思路2类似。
延时数据结构
-
DelayQueue
Java 自身提供,内部持有PriorityQueue,自然底层是基于二叉堆排序。因此写入时间复杂度比较高O(logN)
-
TimeWheel (时间轮)
时间轮算法,底层循环链表支撑每个到期任务,写入复杂度低。常见于延时任务中的数据结构 如Netty、Kafka、Quartz等组件中使用,后边单独再介绍。
DelayQueue 用法
Java自身提供了延时队列的数据结构 DelayQueue,内部元素要实现Delayed (剩余时间) 和 Comparable (DelayQueue 内部持有 PriorityQueue,排序用)接口。
public class DelayTest {
public static void main(String[] args) {
Queue<Entry> queue = new DelayQueue<>();
Entry entry = new Entry("甲", System.currentTimeMillis() + 5000);
Entry entry1 = new Entry("乙", System.currentTimeMillis() + 2000);
Entry entry2 = new Entry("丙", System.currentTimeMillis() + 3000);
Entry entry3 = new Entry("丁", System.currentTimeMillis() + 5000);
new Thread(() -> {
queue.offer(entry);
queue.offer(entry1);
queue.offer(entry2);
queue.offer(entry3);
}).start();
new Thread(() -> {
while (true) {
// getDelay到期才能poll出来,否则返回null
Entry poll = queue.poll();
if (poll != null) {
System.out.println(poll.name);
} else {
try {
System.out.println("into sleep");
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}).start();
}
static class Entry implements Delayed {
private String name;
/**
* 执行时间
*/
private Long timestamp;
public Entry(String name, Long timestamp) {
this.name = name;
this.timestamp = timestamp;
}
/**
* getDelay到期(小于等于0)才能poll出来
* @param unit
* @return
*/
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(timestamp - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
/**
* 优先级队列排序
* @param o
* @return
*/
@Override
public int compareTo(Delayed o) {
return Long.compare(getDelay(TimeUnit.MILLISECONDS), o.getDelay(TimeUnit.MILLISECONDS));
}
}
}