Go sync.Map 源码实现
以下源码是Go 1.11版本,源码v1.11 github
简单实现
Map本身不是并发安全,如果设计一个简单的并发安全map,我们可以定一个结构体,map+读写锁
type syncMap struct{
lock sync.RWMutex
m map[interface{}]interface{}
}
读的时候加读锁,写的时候加写锁。实现简单,但并发写性能较差,适合读多写少。
Go v1.9提供了并发安全sync.Map,利用CAS无锁技术,增强性能,当然也是对读多写少做的优化。如果写多,还是会频繁加锁。一起看看实现把~
源码分析
阅读这里的源码,代码量并不大。但想捋顺关系也并不容易,建议debug观察。
核心结构
type Map struct {
// 带锁写dirty结构使用
mu Mutex
// read域,原子性+可见性
read atomic.Value // readOnly
// 全部map数据,改数据都体现到这里
dirty map[interface{}]*entry
// 读操作超过miss次数,则把dirty map赋值给read域,dirty置为nil
misses int
}
// immutable结构,注意map的value是指针,指针可以改
type readOnly struct {
m map[interface{}]*entry
amended bool // 如果dirty有,而read.m没有 则为true
}
// 删除状态 map的value值
var expunged = unsafe.Pointer(new(interface{}))
// map的value指针具体值,map的value是*entry
type entry struct {
p unsafe.Pointer // *interface{}
}
func newEntry(i interface{}) *entry {
return &entry{p: unsafe.Pointer(&i)}
}
结构图示
方法介绍Load、Store、Delete,其他类似。
Store
func (m *Map) Store(key, value interface{}) {
read, _ := m.read.Load().(readOnly)
if e, ok := read.m[key]; ok && e.tryStore(&value) {
return
}
m.mu.Lock()
read, _ = m.read.Load().(readOnly)
if e, ok := read.m[key]; ok {
if e.unexpungeLocked() {
// The entry was previously expunged, which implies that there is a
// non-nil dirty map and this entry is not in it.
m.dirty[key] = e
}
e.storeLocked(&value)
} else if e, ok := m.dirty[key]; ok {
e.storeLocked(&value)
} else {
if !read.amended {
// We're adding the first new key to the dirty map.
// Make sure it is allocated and mark the read-only map as incomplete.
m.dirtyLocked()
m.read.Store(readOnly{m: read.m, amended: true})
}
m.dirty[key] = newEntry(value)
}
m.mu.Unlock()
}
func (e *entry) tryStore(i *interface{}) bool {
for {
p := atomic.LoadPointer(&e.p)
if p == expunged {
return false
}
// CAS操作替换value
if atomic.CompareAndSwapPointer(&e.p, p, unsafe.Pointer(i)) {
return true
}
}
}
func (m *Map) dirtyLocked() {
if m.dirty != nil {
return
}
read, _ := m.read.Load().(readOnly)
// dirty重置,从read拷贝过来
m.dirty = make(map[interface{}]*entry, len(read.m))
for k, e := range read.m {
// 去掉删除态的value 这里有nil和expunged的区别
if !e.tryExpungeLocked() {
m.dirty[k] = e
}
}
}
- 从read map读,如果key能找到,并且没有被删除,则直接CAS修改(改旧值性能比较高)。两个map共享value值,改完即可。
- 加锁后,双重检查。如果read map中存在且删除了,那么放到dirty里。原子改value。
- 如果read map没有,dirty map有,直接改drity里value
- 如果read dirty都没有,是新增key。这个key是第一个key,则需要重置dirty(read拷贝过来),把amended置为true。并把这个kv存储下来。解锁。
Load
func (m *Map) Load(key interface{}) (value interface{}, ok bool) {
read, _ := m.read.Load().(readOnly)
e, ok := read.m[key]
if !ok && read.amended {
m.mu.Lock()
// Avoid reporting a spurious miss if m.dirty got promoted while we were
// blocked on m.mu. (If further loads of the same key will not miss, it's
// not worth copying the dirty map for this key.)
read, _ = m.read.Load().(readOnly)
e, ok = read.m[key]
if !ok && read.amended {
e, ok = m.dirty[key]
// Regardless of whether the entry was present, record a miss: this key
// will take the slow path until the dirty map is promoted to the read
// map.
m.missLocked()
}
m.mu.Unlock()
}
if !ok {
return nil, false
}
return e.load()
}
func (m *Map) missLocked() {
m.misses++
if m.misses < len(m.dirty) {
return
}
m.read.Store(readOnly{m: m.dirty})
m.dirty = nil
m.misses = 0
}
func (e *entry) load() (value interface{}, ok bool) {
p := atomic.LoadPointer(&e.p)
if p == nil || p == expunged {
return nil, false
}
return *(*interface{})(p), true
}
- 从read map读,如果读到了直接atomic加载返回
- 如果read中没有,并且read.amended 是 false,证明dirty也没有,返回
- 如果read.amended 是 true,尝试从dirty读,当然也可能没有。
- dirty是普通map,需要加锁。miss++处理,如果超过dirty长度阈值,晋升处理。把dirty赋值给read,并把dirty置为nil,miss置为0
Delete
func (m *Map) LoadAndDelete(key interface{}) (value interface{}, loaded bool) {
read, _ := m.read.Load().(readOnly)
e, ok := read.m[key]
if !ok && read.amended {
m.mu.Lock()
read, _ = m.read.Load().(readOnly)
e, ok = read.m[key]
if !ok && read.amended {
e, ok = m.dirty[key]
delete(m.dirty, key)
// Regardless of whether the entry was present, record a miss: this key
// will take the slow path until the dirty map is promoted to the read
// map.
m.missLocked()
}
m.mu.Unlock()
}
if ok {
return e.delete()
}
return nil, false
}
// Delete deletes the value for a key.
func (m *Map) Delete(key interface{}) {
m.LoadAndDelete(key)
}
func (e *entry) delete() (value interface{}, ok bool) {
for {
p := atomic.LoadPointer(&e.p)
if p == nil || p == expunged {
return nil, false
}
if atomic.CompareAndSwapPointer(&e.p, p, nil) {
return *(*interface{})(p), true
}
}
}
- delete操作因为要返回旧值,所以先load再删除
- 从read map读,如果read里存在,删除处理。这个删除是把entry.p 置为nil
- 如果read map没有,但read.amended 是true,说明dirty可能存在。
- 加锁后双重检测,删除dirty下的key。miss++,到达阈值后晋升read。
增删改查图示
这个图示总结的很好,把每种变更情况表示出来,便于理解。
总结
简单概括实现:
-
Map结构
- 只读map域
- dirty map域
- 更新锁
- miss统计以便刷新只读域
-
只读map的entry和dirty的entry,在内存是共享的
-
只读map也是可以CAS更新的,更新的是value的entry指针。
当读map存在这个key,并且未删除,就可以CAS更新
-
read map重置,dirty上升
dirty map赋值给read,dirty置为nil。
-
dirty map重置
dirty上升后会置为nil,当再store新key时,会生成dirty,数据来源从read复制非删除的value给dirty。
-
利用cas无锁
-
dirty修改时要加锁
-
删除态的 nil和expunged的区别