syncmap
go 標准庫提供的sync.Map是一個並發安全的 map,使用它時不需要使用鎖之類的方式來控制,其實現不算特別復雜,去掉注釋總共也就兩三百行代碼。
結構
type Map struct {
mu Mutex
read atomic.Pointer[readOnly]
dirty map[any]*entry
misses int
}它總共只有四個字段,分別如下
read,只讀的 map,可以理解為對dirty的緩存dirty,一個普通的 mapmisses,訪問read時沒有命中的次數mu,保護dirty的並發安全
read是sync.readonly類型,其內部依舊是一個 map,其中的amended字段表示dirty是否包含read所沒有的 key。
type readOnly struct {
m map[any]*entry
amended bool // true if the dirty map contains some key not in m.
}另外entry類型結構如下,p是一個指向 value 的指針。
type entry struct {
p atomic.Pointer[any]
}對於一個 entry 而言,它有三種可能的情況
- 正常情況,存放了對應的值
p為nil,表示該鍵值對已被刪除,此時 dirty 可能為空,或者其可能依舊存在於 dirty 中。p == expunged,expunged是一個空的接口對象,同樣代表了鍵值對已經被刪除且不存在於 dirty 中。
標准庫 map 的並發安全是通過讀寫分離來實現的,read和dirty所存儲的entry指針都是指向的同一片 value,read是只讀的,所以多個協程訪問時也不會有安全問題,dirty是可以被修改的,受到互斥鎖的保護,misses記錄了 key 訪問沒有命中的次數,當次數累計到一定的值後,當前的dirty就會轉變為read,misses清零,這就是sync.Map大致的工作邏輯,後續會對其操作進行更加細致的分析。

讀
讀操作對應Map.Load方法,代碼如下
func (m *Map) Load(key any) (value any, ok bool) {
read := m.loadReadOnly()
e, ok := read.m[key]
if !ok && read.amended {
m.mu.Lock()
read = m.loadReadOnly()
e, ok = read.m[key]
if !ok && read.amended {
e, ok = m.dirty[key]
m.missLocked()
}
m.mu.Unlock()
}
if !ok {
return nil, false
}
return e.load()
}它首先會訪問 read,如果存在的話就直接返回,否則就會去嘗試持有mu互斥鎖,然後再去訪問 read,因為在獲得鎖的期間 dirty 有可能晉升為 read,倘若還是沒有找到,最終就會去訪問 dirty,並記錄一次 miss,然後解鎖。
func (m *Map) missLocked() {
m.misses++
if m.misses < len(m.dirty) {
return
}
m.read.Store(&readOnly{m: m.dirty})
m.dirty = nil
m.misses = 0
}通過missLocked方法可以看出,dirty 晉升為 read 的閾值條件是m.misses >= len(m.dirty)。
寫
寫操作對應的是Store方法,不過實際上也是由Swap方法來完成,previous代表著先前的值,loaded表示 key 是否存在。
func (m *Map) Swap(key, value any) (previous any, loaded bool)寫操作的流程分為兩部分,如果訪問的 key 存在於 read 中的話,那麼就會直接獲取對應的entry,然後通過 CAS 來更新entry的值,期間不需要上鎖。
read := m.loadReadOnly()
if e, ok := read.m[key]; ok {
if v, ok := e.trySwap(&value); ok {
if v == nil {
return nil, false
}
return *v, true
}
}在自旋的期間,如果p == expunged則代表著該 key 已經被刪除了,就會直接返回。
func (e *entry) trySwap(i *any) (*any, bool) {
for {
p := e.p.Load()
if p == expunged {
return nil, false
}
if e.p.CompareAndSwap(p, i) {
return p, true
}
}
}如果 key 不存在於 read 中,就會嘗試獲取鎖來進行接下來的操作,接下來分三種情況。第一種情況,在獲取鎖的期間 dirty 晉升為了 read,如果訪問到的entry是expunged,則說明它已經被刪除了,且不存在於 dirty 中,這時需要將其添加到 dirty 中,然後再存儲對應的值。
read = m.loadReadOnly()
if e, ok := read.m[key]; ok {
if e.unexpungeLocked() {
m.dirty[key] = e
}
if v := e.swapLocked(&value); v != nil {
loaded = true
previous = *v
}
}第二種情況,read 中沒有,但是 dirty 中有,也是直接存儲對應的值
if e, ok := m.dirty[key]; ok {
if v := e.swapLocked(&value); v != nil {
loaded = true
previous = *v
}
}第三種情況,read 中沒有,dirty 中也沒有,在這裡如果read.amended為false的話,代表著 dirty 是空的,然後會使用m.dirtyLocked將 read 中所有未刪除的鍵值對復制到 ditry 中,然後將read.amended標記為true,最後會直接新建一個 entry 來存放對應的值。
else {
if !read.amended {
// We're adding the first new key to the dirty map.
// Make sure it is allocated and mark the read-only map as incomplete.
m.dirtyLocked()
m.read.Store(&readOnly{m: read.m, amended: true})
}
m.dirty[key] = newEntry(value)
}
func (m *Map) dirtyLocked() {
if m.dirty != nil {
return
}
read := m.loadReadOnly()
m.dirty = make(map[any]*entry, len(read.m))
for k, e := range read.m {
if !e.tryExpungeLocked() {
m.dirty[k] = e
}
}
}刪
刪除操作對應的是LoadAndDelete方法,它的思路與讀操作幾乎完全一致,只是多了一個delete函數的調用。
func (m *Map) LoadAndDelete(key any) (value any, loaded bool) {
read := m.loadReadOnly()
e, ok := read.m[key]
if !ok && read.amended {
m.mu.Lock()
read = m.loadReadOnly()
e, ok = read.m[key]
if !ok && read.amended {
e, ok = m.dirty[key]
delete(m.dirty, key)
m.missLocked()
}
m.mu.Unlock()
}
if ok {
return e.delete()
}
return nil, false
}刪除鍵值對的時候永遠只會對 ditry 執行delete操作,對應 read 而言,只會將它所存儲的 entry 的值修改為nil。
func (e *entry) delete() (value any, ok bool) {
for {
p := e.p.Load()
if p == nil || p == expunged {
return nil, false
}
if e.p.CompareAndSwap(p, nil) {
return *p, true
}
}
}遍歷
遍歷操作對應著Range方法
func (m *Map) Range(f func(key, value any) bool) {
read := m.loadReadOnly()
if read.amended {
m.mu.Lock()
read = m.loadReadOnly()
if read.amended {
read = readOnly{m: m.dirty}
m.read.Store(&read)
m.dirty = nil
m.misses = 0
}
m.mu.Unlock()
}
for k, e := range read.m {
v, ok := e.load()
if !ok {
continue
}
if !f(k, v) {
break
}
}
}在遍歷時只會遍歷 read,如果read.amended為true,代表 read 中的 key 有缺失,這時會直接將 ditry 晉升為 read,然後通過for range循環來遍歷,並對每一個鍵值對調用回調函數。
性能
sync.Map采用了讀寫分離的方式來進行並發控制,它更適合讀多寫少的場景,因為在大部分情況下訪問一個鍵值對的時候不需要加鎖。但是如果要新增一個元素的話,就需要持有一個全局鎖,它會阻塞當前 map 的所有操作,這就導致了寫性能的低下,所以sync.Map並不適用於所有情況,對於讀少寫多的情況,可以采用分段鎖的方式來實現,這樣可以避免阻塞全局,這裡推薦一個開源實現orcaman/concurrent-map: a thread-safe concurrent map for go (github.com),采用分片的方式實現,且支持泛型,在性能和使用體驗上都會好一些。
