性能优化之-批处理提升吞吐量
在高并发业务场景中,单条数据处理的方式往往会因频繁的IO操作、资源调度导致性能瓶颈,批处理是提升系统吞吐量的核心手段。比如kafka生产者有这种应用场景。
当然批处理这种性能优化是对单机应用的,如果考虑分布式应对并发直接采用kafka这种MQ就好。
本文将实现一个简易的批处理工具,并介绍设计思路。
一、批处理工具的关键设计
- 数据安全:线程安全,并保证数据不丢失(停机时的剩余消费);
- 高吞吐量:积攒一批来处理;
- 低延迟:虽然积攒一批,但支持通过定时触发机制,控制数据最大处理延迟;
核心点:基于阻塞队列缓存数据,通过「定时触发 + 队列满触发」双策略触发批处理,最终调用自定义消费逻辑处理批量数据。
二、核心实现与关键问题解析
代码
批处理工具的核心由三部分组成:阻塞队列(数据缓存)、定时线程池(定时触发)、消费逻辑(自定义处理),基础代码结构如下:
public class BatchTool<T> {
private ScheduledExecutorService scheduledExecutorService;
private BlockingQueue<T> queue;
private Consumer<List<T>> consumer;
// 锁batch()方法,避免调用线程和定时线程并发执行
private final Object batchLock = new Object();
public BatchTool(Integer maxSize, Integer intervalTime, TimeUnit timeUnit, Consumer<List<T>> consumer) {
// 参数校验
this.queue = new LinkedBlockingQueue<>(maxSize);
// 消费数据处理
this.consumer = consumer;
// 固定频率取走队列数据
scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(r -> new Thread(r, "scheduledExecutor"));
scheduledExecutorService.scheduleAtFixedRate(this::batch, intervalTime, intervalTime, timeUnit);
}
/**
* 提交任务元素
* @param e
* @throws InterruptedException
*/
public void put(T e) throws InterruptedException {
// offer 加锁处理,满了放不进去返回false,则进行批处理
if (!queue.offer(e)) {
batch();
// 批处理后再阻塞放进去该元素
queue.put(e);
}
}
/**
* 取出队列数据
* 封装调用方的批处理方法
*/
private void batch() {
List<T> list = new ArrayList<>();
// 从 drainTo()方法的原子性,不加锁处理也不影响数据安全,最好加上
// 尽量减少锁范围
synchronized (batchLock) {
if (queue.isEmpty()) {
return;
}
// 内部加锁处理写到list
queue.drainTo(list);
}
if (list.isEmpty()) {
return;
}
try {
// 调用方可异步处理,避免阻塞;或者调用方传入线程池参数;
consumer.accept(list);
} catch (Exception e) {
e.printStackTrace();
}
}
public void shutdown() {
// 队列剩余数据消费掉
batch();
if (scheduledExecutorService == null) {
return;
}
try {
// 温和关闭,不会再进来任务
scheduledExecutorService.shutdown();
// 等待线程池关闭(最多等待10秒)
if (!scheduledExecutorService.awaitTermination(10, TimeUnit.SECONDS)) {
// 强制关闭
scheduledExecutorService.shutdownNow();
}
} catch (InterruptedException e) {
scheduledExecutorService.shutdownNow();
}
}
}
测试:
public class BatchToolTest {
@Data
@AllArgsConstructor
static class Obj {
private String name;
}
/**
* 调用方处理方法
* @param objList
*/
public void consume(List<Obj> objList) {
if (CollectionUtils.isEmpty(objList)) {
return;
}
objList.forEach(e -> {
System.out.println(e + " " + Thread.currentThread().getName() + " " + System.currentTimeMillis());
});
}
/**
* @param args
* @throws InterruptedException
* 输出:
* 前5个元素是主线程输出
* 后5个元素是定时线程输出
*/
public static void main(String[] args) throws InterruptedException {
BatchToolTest batchToolTest = new BatchToolTest();
BatchTool<Obj> objBatchTool = new BatchTool<>(5, 10, TimeUnit.SECONDS, batchToolTest::consume);
for (int i = 0; i < 10; i++) {
objBatchTool.put(new Obj(String.valueOf(i)));
}
// 测试到时间读队列
TimeUnit.SECONDS.sleep(15);
objBatchTool.shutdown();
}
}
问题1:线程安全保障
批处理工具的线程安全核心依赖两点:
(1)阻塞队列的原子操作
LinkedBlockingQueue 是线程安全的阻塞队列,其 offer()、put()、drainTo() 方法均通过内置全局锁保证原子性,避免多线程操作时的数据错乱。
问题2:性能与吞吐量优化
(1)双触发策略平衡延迟与吞吐量
- 队列满触发:当队列达到最大容量时,
put()方法主动触发批处理; - 队列长度到达阈值触发:也可以适当优化,当队列达到阈值时,
put()方法可以提前主动触发批处理;这样突发流量时,offer()成功率更高; - 定时触发:固定间隔触发批处理,保证数据最大处理延迟可控(如1秒触发一次,数据最多延迟1秒处理)。
(2)非阻塞优先+阻塞兜底的入队策略
put() 方法优先使用 offer() 非阻塞入队,仅当队列满时才触发批处理并通过 put() 阻塞入队,既保证高并发下的吞吐量,又避免数据丢失。
问题3:优雅关闭的实现
尽量避免“数据丢失”和“服务卡死”:
- 先消费剩余数据:调用
batch()消费队列中未处理的数据; - 温和关闭线程池:
shutdown()拒绝新任务,等待已提交的定时任务完成; - 超时兜底:
awaitTermination(10, TimeUnit.SECONDS)阻塞等待10秒,超时则调用shutdownNow()强制关闭;
三、总结
- 线程安全:依赖
LinkedBlockingQueue原子操作 +batch()方法同步锁,避免数据错乱和空执行; - 性能优化:双触发策略(队列满+定时)平衡延迟与吞吐量,非阻塞入队优先保证高并发;
- 优雅关闭:先消费剩余数据 → 温和关闭 → 超时强制关闭,尽可能保证数据不丢失、服务不卡死