简易实现固定窗口限流
限流器有多种算法,比如固定窗口、滑动窗口、漏桶、令牌桶等。比如Guava的限流器采用的令牌桶。
我们这里实现一个简易的固定窗口的限流器,来初体验下限流器的作用和实现,也顺道对比Go和Java简易实现的代码哈。
通常我们限制qps,比如约定对某个接口的请求,1s内最多打过来100次请求。1s和100次就是我们做限流的主要限制参数,也就是限流器的构造参数。
Go 实现固定时间窗口限流器
limitflow.go
定义一个结构:
- 一个时间区间(需要定时清0,time.Tick非常方便)
- 限制次数
- 请求次数
- 锁(修改内部属性,这个场景读写比相似,不需要显示写读写锁,读写锁更耗性能)
代码
package limitflow
import (
"fmt"
"sync"
"time"
)
type FlowLimit struct {
l sync.Mutex
interval time.Duration
total int64
reqnum int64
}
func NewFlowLimit(interval time.Duration, total int64) *FlowLimit {
flowlimit := &FlowLimit{
interval: interval,
total: total,
}
go func() {
// 定时器
tick := time.Tick(interval)
for {
<-tick
fmt.Println("tick...")
// 重置0
flowlimit.Reset()
}
}()
return flowlimit
}
// 请求数+1
func (limit *FlowLimit) Increase() {
limit.l.Lock()
defer limit.l.Unlock()
limit.reqnum += 1
}
// 重置0
func (limit *FlowLimit) Reset() {
limit.l.Lock()
defer limit.l.Unlock()
limit.reqnum = 0
}
// 满足限流否
func (limit *FlowLimit) AccessOk() bool {
limit.l.Lock()
defer limit.l.Unlock()
return limit.reqnum < limit.total
}
做一个web服务,模拟一个接口请求
test.go
package main
import (
"fmt"
"net/http"
"time"
"limitflow"
)
// 测试 new一个限流A
var limitA = limitflow.NewFlowLimit(10*time.Second, 3)
func main() {
server()
}
func testFunc(w http.ResponseWriter, req *http.Request) {
if limitA.AccessOk() {
limitA.Increase()
fmt.Fprintln(w, "welcome")
} else {
fmt.Fprintln(w, "flow limit")
}
}
func server() {
http.HandleFunc("/test", testFunc)
http.ListenAndServe(":8777", nil)
}
Java 实现等效的固定时间窗口限流器
Java 实现,保持固定时间窗口 + 互斥锁保护计数器 + 定时重置的设计。
如 ReentrantLock 替代 sync.Mutex, ScheduledExecutorService 替代 time.Tick。
代码
public class FlowLimit {
// 互斥锁,保护reqNum的并发修改(对应Go的sync.Mutex)
private final ReentrantLock lock = new ReentrantLock();
// 时间窗口间隔(对应Go的interval)
private final long interval;
// 时间窗口内允许的最大请求数(对应Go的total)
private final long total;
// 当前窗口的请求计数(对应Go的reqnum)
private long reqNum;
// 定时任务线程池(用于重置计数器)
private final ScheduledExecutorService scheduler;
/**
* 构造方法
* @param interval 时间窗口间隔(单位:毫秒)
* @param total 窗口内最大请求数
*/
public FlowLimit(long interval, long total) {
this.interval = interval;
this.total = total;
this.reqNum = 0;
// 初始化定时任务线程池(单线程即可)
this.scheduler = Executors.newSingleThreadScheduledExecutor(runnable -> {
Thread thread = new Thread(runnable);
thread.setName("flow-limit-reset-thread");
thread.setDaemon(true); // 守护线程,避免阻塞程序退出
return thread;
});
// 启动定时任务:每隔interval毫秒重置计数器(对应Go的time.Tick + 循环)
scheduler.scheduleAtFixedRate(
this::resetReqNum,
interval, // 首次执行延迟(与窗口间隔一致)
interval, // 后续执行间隔
TimeUnit.MILLISECONDS
);
}
/**
* 判断是否允许访问(对应Go的AccessOk方法)
* @return true=未限流,false=已限流
*/
public boolean accessOk() {
lock.lock();
try {
if (reqNum < total) {
reqNum += 1;
return true;
}
return false;
} finally {
lock.unlock();
}
}
/**
* 重置请求计数器(定时任务执行)
*/
private void resetReqNum() {
lock.lock();
try {
System.out.println("tick...");
reqNum = 0;
} finally {
lock.unlock();
}
}
/**
* 关闭定时任务(避免内存泄漏)
*/
public void close() {
scheduler.shutdown();
try {
// 等待线程池优雅关闭
if (!scheduler.awaitTermination(1, TimeUnit.SECONDS)) {
scheduler.shutdownNow();
}
} catch (InterruptedException e) {
scheduler.shutdownNow();
}
}
// 测试示例
public static void main(String[] args) throws InterruptedException {
// 初始化限流器:1秒窗口,最多3次请求
FlowLimit flowLimit = new FlowLimit(1000, 3);
// 模拟并发请求
for (int i = 0; i < 5; i++) {
new Thread(() -> {
boolean canAccess = flowLimit.accessOk();
if (canAccess) {
System.out.println(Thread.currentThread().getName() + ":访问成功,当前计数=" + flowLimit.reqNum);
} else {
System.out.println(Thread.currentThread().getName() + ":访问失败(已限流)");
}
}, "request-thread-" + i).start();
}
// 等待1个窗口周期,验证计数器重置
Thread.sleep(1100);
System.out.println("===== 窗口重置后 =====");
// 再次模拟请求
new Thread(() -> {
boolean canAccess = flowLimit.accessOk();
if (canAccess) {
System.out.println(Thread.currentThread().getName() + ":访问成功,当前计数=" + flowLimit.reqNum);
} else {
System.out.println(Thread.currentThread().getName() + ":访问失败(已限流)");
}
}, "request-thread-6").start();
// 关闭限流器
flowLimit.close();
}
}
这里Java的锁可以替换成AtomicInteger的CAS实现。