Skip to content

syncmap

go 標准庫提供的sync.Map是一個並發安全的 map,使用它時不需要使用鎖之類的方式來控制,其實現不算特別復雜,去掉注釋總共也就兩三百行代碼。

結構

go
type Map struct {
    mu Mutex
    read atomic.Pointer[readOnly]
    dirty map[any]*entry
    misses int
}

它總共只有四個字段,分別如下

  • read,只讀的 map,可以理解為對dirty的緩存
  • dirty,一個普通的 map
  • misses,訪問read時沒有命中的次數
  • mu,保護dirty的並發安全

readsync.readonly類型,其內部依舊是一個 map,其中的amended字段表示dirty是否包含read所沒有的 key。

go
type readOnly struct {
  m       map[any]*entry
  amended bool // true if the dirty map contains some key not in m.
}

另外entry類型結構如下,p是一個指向 value 的指針。

go
type entry struct {
  p atomic.Pointer[any]
}

對於一個 entry 而言,它有三種可能的情況

  1. 正常情況,存放了對應的值
  2. pnil,表示該鍵值對已被刪除,此時 dirty 可能為空,或者其可能依舊存在於 dirty 中。
  3. p == expungedexpunged是一個空的接口對象,同樣代表了鍵值對已經被刪除且不存在於 dirty 中。

標准庫 map 的並發安全是通過讀寫分離來實現的,readdirty所存儲的entry指針都是指向的同一片 value,read是只讀的,所以多個協程訪問時也不會有安全問題,dirty是可以被修改的,受到互斥鎖的保護,misses記錄了 key 訪問沒有命中的次數,當次數累計到一定的值後,當前的dirty就會轉變為readmisses清零,這就是sync.Map大致的工作邏輯,後續會對其操作進行更加細致的分析。

讀操作對應Map.Load方法,代碼如下

go
func (m *Map) Load(key any) (value any, ok bool) {
  read := m.loadReadOnly()
  e, ok := read.m[key]
  if !ok && read.amended {
    m.mu.Lock()
    read = m.loadReadOnly()
    e, ok = read.m[key]
    if !ok && read.amended {
      e, ok = m.dirty[key]
      m.missLocked()
    }
    m.mu.Unlock()
  }
  if !ok {
    return nil, false
  }
  return e.load()
}

它首先會訪問 read,如果存在的話就直接返回,否則就會去嘗試持有mu互斥鎖,然後再去訪問 read,因為在獲得鎖的期間 dirty 有可能晉升為 read,倘若還是沒有找到,最終就會去訪問 dirty,並記錄一次 miss,然後解鎖。

go
func (m *Map) missLocked() {
  m.misses++
  if m.misses < len(m.dirty) {
    return
  }
  m.read.Store(&readOnly{m: m.dirty})
  m.dirty = nil
  m.misses = 0
}

通過missLocked方法可以看出,dirty 晉升為 read 的閾值條件是m.misses >= len(m.dirty)

寫操作對應的是Store方法,不過實際上也是由Swap方法來完成,previous代表著先前的值,loaded表示 key 是否存在。

go
func (m *Map) Swap(key, value any) (previous any, loaded bool)

寫操作的流程分為兩部分,如果訪問的 key 存在於 read 中的話,那麼就會直接獲取對應的entry,然後通過 CAS 來更新entry的值,期間不需要上鎖。

go
read := m.loadReadOnly()
if e, ok := read.m[key]; ok {
    if v, ok := e.trySwap(&value); ok {
        if v == nil {
            return nil, false
        }
        return *v, true
    }
}

在自旋的期間,如果p == expunged則代表著該 key 已經被刪除了,就會直接返回。

go
func (e *entry) trySwap(i *any) (*any, bool) {
  for {
    p := e.p.Load()
    if p == expunged {
      return nil, false
    }
    if e.p.CompareAndSwap(p, i) {
      return p, true
    }
  }
}

如果 key 不存在於 read 中,就會嘗試獲取鎖來進行接下來的操作,接下來分三種情況。第一種情況,在獲取鎖的期間 dirty 晉升為了 read,如果訪問到的entryexpunged,則說明它已經被刪除了,且不存在於 dirty 中,這時需要將其添加到 dirty 中,然後再存儲對應的值。

go
read = m.loadReadOnly()
if e, ok := read.m[key]; ok {
    if e.unexpungeLocked() {
        m.dirty[key] = e
    }
    if v := e.swapLocked(&value); v != nil {
        loaded = true
        previous = *v
    }
}

第二種情況,read 中沒有,但是 dirty 中有,也是直接存儲對應的值

go
if e, ok := m.dirty[key]; ok {
    if v := e.swapLocked(&value); v != nil {
        loaded = true
        previous = *v
    }
}

第三種情況,read 中沒有,dirty 中也沒有,在這裡如果read.amendedfalse的話,代表著 dirty 是空的,然後會使用m.dirtyLocked將 read 中所有未刪除的鍵值對復制到 ditry 中,然後將read.amended標記為true,最後會直接新建一個 entry 來存放對應的值。

go
else {
    if !read.amended {
        // We're adding the first new key to the dirty map.
        // Make sure it is allocated and mark the read-only map as incomplete.
        m.dirtyLocked()
        m.read.Store(&readOnly{m: read.m, amended: true})
    }
    m.dirty[key] = newEntry(value)
}

func (m *Map) dirtyLocked() {
  if m.dirty != nil {
    return
  }

  read := m.loadReadOnly()
  m.dirty = make(map[any]*entry, len(read.m))
  for k, e := range read.m {
    if !e.tryExpungeLocked() {
      m.dirty[k] = e
    }
  }
}

刪除操作對應的是LoadAndDelete方法,它的思路與讀操作幾乎完全一致,只是多了一個delete函數的調用。

go
func (m *Map) LoadAndDelete(key any) (value any, loaded bool) {
  read := m.loadReadOnly()
  e, ok := read.m[key]
  if !ok && read.amended {
    m.mu.Lock()
    read = m.loadReadOnly()
    e, ok = read.m[key]
    if !ok && read.amended {
      e, ok = m.dirty[key]
      delete(m.dirty, key)
      m.missLocked()
    }
    m.mu.Unlock()
  }
  if ok {
    return e.delete()
  }
  return nil, false
}

刪除鍵值對的時候永遠只會對 ditry 執行delete操作,對應 read 而言,只會將它所存儲的 entry 的值修改為nil

go
func (e *entry) delete() (value any, ok bool) {
  for {
    p := e.p.Load()
    if p == nil || p == expunged {
      return nil, false
    }
    if e.p.CompareAndSwap(p, nil) {
      return *p, true
    }
  }
}

遍歷

遍歷操作對應著Range方法

go
func (m *Map) Range(f func(key, value any) bool) {
  read := m.loadReadOnly()
  if read.amended {
    m.mu.Lock()
    read = m.loadReadOnly()
    if read.amended {
      read = readOnly{m: m.dirty}
      m.read.Store(&read)
      m.dirty = nil
      m.misses = 0
    }
    m.mu.Unlock()
  }

  for k, e := range read.m {
    v, ok := e.load()
    if !ok {
      continue
    }
    if !f(k, v) {
      break
    }
  }
}

在遍歷時只會遍歷 read,如果read.amendedtrue,代表 read 中的 key 有缺失,這時會直接將 ditry 晉升為 read,然後通過for range循環來遍歷,並對每一個鍵值對調用回調函數。

性能

sync.Map采用了讀寫分離的方式來進行並發控制,它更適合讀多寫少的場景,因為在大部分情況下訪問一個鍵值對的時候不需要加鎖。但是如果要新增一個元素的話,就需要持有一個全局鎖,它會阻塞當前 map 的所有操作,這就導致了寫性能的低下,所以sync.Map並不適用於所有情況,對於讀少寫多的情況,可以采用分段鎖的方式來實現,這樣可以避免阻塞全局,這裡推薦一個開源實現orcaman/concurrent-map: a thread-safe concurrent map for go (github.com),采用分片的方式實現,且支持泛型,在性能和使用體驗上都會好一些。

Golang學習網由www.golangdev.cn整理維護