Go 并发设计思路--非阻塞缓存
Go开发中经常遇到并发设计的场景,伴随着一个绕不过的问题就是并发安全。
并发安全设计几种方式
- 提前加载好
- 利用锁
- 把对共享变量的读写收敛到一个goroutine里,利用channel和其他goroutine通信
下面这个例子 – 非阻塞缓存,很好的展示了并发设计的解决思路,以及如何规避并发安全的问题。
缓存的初始版本(非并发安全)
package mem
type Memo struct {
f Func
cache map[string]result
}
type result struct {
value interface{}
err error
}
type Func func(string) (interface{}, error)
func New(f Func) *Memo {
return &Memo{
f: f,
cache: make(map[string]result),
}
}
/**
非并发安全
*/
func (m *Memo) GetValue(key string) (interface{}, error) {
res, ok := m.cache[key]
if !ok {
res.value, res.err = m.f(key)
m.cache[key] = res
}
return res.value, res.err
}
第二个版本(并发安全)
解决并发安全思路一: 加锁
type Memo struct {
f Func
cache map[string]result
mu *sync.Mutex // 加锁
}
func (m *Memo) GetValue(key string) (interface{}, error) {
m.mu.Lock() // 锁
res, ok := m.cache[key]
if !ok {
res.value, res.err = m.f(key)
m.cache[key] = res
}
m.mu.Unlock() // 释放锁
return res.value, res.err
}
这种加锁方式,虽然解决了并发安全,但有什么问题?
每个调用GetValue()的方法进来都要等待锁释放,并且锁把慢函数f()包括了进去。每个慢函数执行中,都会影响到其他goroutine获取缓存,相当于每个f()操作都被锁给串行了。 如何解决?
第三个版本(性能优化)
把慢函数放到锁外面
func (m *Memo) GetValue(key string) (interface{}, error) {
m.mu.Lock()
res, ok := m.cache[key]
m.mu.Unlock()
if !ok {
res.value, res.err = m.f(key)// 可以并发执行了
m.mu.Lock()
m.cache[key] = res
m.mu.Unlock()
}
return res.value, res.err
}
有两个用锁的地方:获取m.cache时用一次锁,塞入m.cache时再用一次锁。
这样,当这个goroutine执行慢函数时(或者说两次锁间隙中),其他goroutine想要获取缓存时,争用到锁影响小了,性能更好。
这仍然带来什么问题呢? 可能会带来重复覆盖。同样的key有多个goroutine调用,这样都走到了f(),会相继覆盖这个缓存。
第四个版本(进一步优化)
解决重复抑制(duplicate suppression)的问题
重复覆盖的问题如何解决? 可以利用关闭channel的通知作用
goroutine A 在处理某个key时,其他goroutine在访问这个key时,使用 <-ch
, 当 A处理完后,close(ch)
即可通知到其他goroutine处理完毕
这里需要改造下cache字段,值改为 entry指针,entry结构中包含 channel (该channel只起关闭通知作用,不需要放元素)
type Memo struct {
f Func
mutex sync.Mutex
cache map[string]*entry
}
type entry struct {
res result
ch chan struct{}
}
type result struct {
value interface{}
err error
}
type Func func(key string) (interface{}, error)
func New(f Func) *Memo {
return &Memo{
f: f,
cache : make(map[string]*entry),
}
}
func (m * Memo) GetValue(key string) (interface{}, error) {
m.mutex.Lock()
e, ok := m.cache[key]
if !ok {
// 第一次访问key
e = &entry{ch : make(chan struct{})}
// 技巧:先把entry的channel放进去 待慢函数f()后再放res
m.cache[key] = e
m.mutex.Unlock()
e.res.value, e.res.err = m.f(key)
// 放好res后,关闭channel,通知其他goroutine
close(e.ch)
} else {
// 重复访问该key
m.mutex.Unlock()
// 阻塞 等待数据res填充完毕 可以返回
<- e.ch
}
return e.res.value, e.res.err
}
关闭channel的通知机制确保了:第一个访问key的goroutine写入res事件一定比重复访问读取到的要早。
这个版本的并发、重复抑制、非阻塞的缓存OK了!
以上是Go的一个并发设计案例的整体思路,兼顾性能和并发安全。值得好好体会。
参考:
《Go程序设计语言》