Skip to content

sync.Mutex

Блокировки — это важный примитив синхронизации в операционных системах. Go предоставляет две реализации в своей стандартной библиотеке: мьютекс и блокировка чтения-записи, соответствующие:

  • sync.Mutex, мьютекс: чтение-чтение эксклюзивное, чтение-запись эксклюзивное, запись-запись эксклюзивное
  • sync.RWMutex, блокировка чтения-записи: чтение-чтение общее, чтение-запись эксклюзивное, запись-запись эксклюзивное

Их сценарии использования очень распространены, используются для защиты общей памяти для последовательного доступа и модификации в условиях конкурентности, как показано в следующем примере:

go
import (
	"fmt"
	"sync"
)

func main() {
	var i int
	var wg sync.WaitGroup
	var mu sync.Mutex

	for range 10 {
		wg.Add(1)
		go func() {
			defer wg.Done()
			mu.Lock()
			viewI := i
			mu.Unlock()

			viewI++

			mu.Lock()
			i = viewI
			mu.Unlock()
		}()
	}

	wg.Wait()
	fmt.Println(i)
}

Без защиты блокировкой вывод этой функции может быть разным при каждом выполнении и непредсказуем. Очевидно, в большинстве сценариев мы не хотим, чтобы такие ситуации происходили. Этот пример очень прост для большинства людей, и вы, возможно, уже владеете использованием блокировок, но вы, возможно, не понимаете, как блокировки Go реализованы внутренне. Сам код не сложен, и эта статья предоставит подробное объяснение.

Locker

Прежде чем мы начнём, давайте посмотрим на тип sync.Locker, который является интерфейсом, определённым Go:

go
// A Locker представляет объект, который может быть заблокирован и разблокирован.
type Locker interface {
	Lock()
	Unlock()
}

Методы, которые он предоставляет, очень просты и легки для понимания: блокировка и разблокировка. Однако, поскольку реализация интерфейса в Go лучше, чем соглашение, большинство людей, возможно, никогда не видели его. Здесь он упоминается только кратко, потому что он не так важен. Обе блокировки, обсуждаемые позже, реализуют этот интерфейс.

Mutex

Определение типа мьютекса Mutex расположено в файле sync/mutex.go. Это тип struct:

go
type Mutex struct {
	state int32
	sema  uint32
}

Определения полей:

  • state, поле, представляющее состояние блокировки
  • sema, т.е. семафор, который будет объяснён позже

Давайте сначала поговорим о state:

go
const (
    mutexLocked = 1 << iota // мьютекс заблокирован
    mutexWoken
    mutexStarving
)

state — это 32-битный целый тип. Низкие 3 бита используются для представления трёх состояний выше. Эти три состояния не независимы; они могут сосуществовать.

  • mutexLocked=1, заблокирован
  • mutexWoken=2, пробуждён
  • mutexStarving=4, режим голодания

Высокие 29 бит используются для представления того, сколько goroutine ожидают блокировку. Так что теоретически мьютекс может использоваться одновременно максимум 2^29+1 goroutine. Однако в реальности маловероятно иметь так много goroutine. Даже если каждая занимает только 2KB (начальное пространство стека), пространство памяти, требуемое для создания такого количества goroutine, будет около 1TB.

+-----------------------------------+---------------+------------+-------------+
|              waiter               | mutexStarving | mutexWoken | mutexLocked |
+-----------------------------------+---------------+------------+-------------+
|              29 bits              | 1 bit         | 1 bit      | 1 bit       |
+-----------------------------------+---------------+------------+-------------+

Мьютекс имеет два режима работы: нормальный режим и режим голодания. Нормальный режим следует порядку, в котором goroutine прибывают в очередь блокирующего ожидания, т.е. FIFO. Это общий случай, а также когда производительность лучшая, потому что все следуют порядку доступа для удержания блокировки без проблем. Режим голодания — это необычный случай. Это голодание относится к ожидающим goroutine, неспособным удерживать блокировку в течение длительного времени и остающимся заблокированными. Это не означает, что сам мьютекс находится в состоянии голодания. Так когда goroutine входят в состояние голодания? Официальные лица приводят пример: есть ранее прибывшая goroutine, которая блокируется, потому что не может удерживать мьютекс. Позже, когда блокировка освобождена и она пробуждена, другая goroutine, которая только что добежала до этой точки, пытается удерживать блокировку (любит врезаться в очередь). Поскольку последняя находится в состоянии выполнения (занимает квант времени CPU), у последней есть высокая вероятность успешно конкурировать за блокировку. В экстремальных случаях может быть много таких goroutine, поэтому только что пробуждённая goroutine никогда не сможет удерживать блокировку (бесконечное врезание в очередь). Она прибыла первой, но никогда не может получить блокировку.

go
const (
	starvationThresholdNs = 1e6
)

Чтобы избежать этой ситуации, Go устанавливает порог ожидания starvationThresholdNs. Если goroutine всё ещё не удерживает блокировку после более чем 1ms, мьютекс входит в режим голодания. В режиме голодания владение мьютексом напрямую передаётся goroutine в начале очереди ожидания. Новые goroutine не будут пытаться удерживать блокировку и вместо этого идут в конец очереди для ожидания. Таким образом, в режиме голодания владение мьютексом удерживается одна за другой goroutine в очереди ожидания (пусть люди в очереди получат блокировку сначала, врезавшиеся идут назад). Когда goroutine успешно удерживает блокировку, если она последняя ожидающая goroutine или время ожидания меньше 1ms, она переключает мьютекс обратно в нормальный режим. Этот дизайн режима голодания избегает того, что некоторые goroutine не могут удерживать блокировку в течение длительного времени и «умирают от голода».

TryLock

Мьютекс предоставляет два метода для блокировки:

  • Lock(), приобрести блокировку в блокирующем режиме
  • TryLock(), приобрести блокировку в неблокирующем режиме

Давайте сначала посмотрим на код TryLock, так как его реализация проще:

go
func (m *Mutex) TryLock() bool {
	old := m.state
	if old&(mutexLocked|mutexStarving) != 0 {
		return false
	}

	if !atomic.CompareAndSwapInt32(&m.state, old, old|mutexLocked) {
		return false
	}

	return true
}

Он начинается с проверки. Если блокировка уже удерживается или находится в режиме голодания (т.е. много goroutine ожидают блокировку), текущая goroutine не может получить блокировку. В противном случае использует операцию CAS для попытки обновить состояние в mutexLocked. Если операция CAS возвращает false, это означает, что другая goroutine успешно получила блокировку в этот период, поэтому текущая goroutine не может получить блокировку. В противном случае успешно получает блокировку. Из этого кода видно, что вызывающий TryLock() — это тот, кто пытается врезаться в очередь, потому что он напрямую захватывает блокировку независимо от того, есть ли goroutine в ожидании (old может быть не равен 0).

Lock

Ниже код Lock. Он также использует операцию CAS для попытки напрямую удерживать блокировку, но он более «честный». Он напрямую удерживает блокировку только когда нет goroutine, блокирующих и ожидающих (old=0).

go
func (m *Mutex) Lock() {
    // Быстрый путь: захватить разблокированный мьютекс.
    if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
       return
    }
    // Медленный путь (вынесен, чтобы быстрый путь мог быть встроен)
    m.lockSlow()
}

Если обнаруживает goroutine, блокирующие и ожидающие, он «честно» становится в конец очереди и входит в поток вращения lockSlow для ожидания блокировки (ядро мьютекса). Сначала подготавливаются некоторые переменные:

go
func (m *Mutex) lockSlow() {
    var waitStartTime int64
    starving := false
    awoke := false
    iter := 0
    old := m.state
  • waitStartTime: используется для записи времени начала ожидания, чтобы проверить, входить ли в режим голодания.
  • starving: указывает, была ли текущая goroutine неспособна получить блокировку более чем 1ms.
  • awoke: помечает, была ли текущая goroutine пробуждена.
  • iter: счётчик, записывает количество итераций вращения.
  • old: получает текущее состояние мьютекса.

Затем входит в цикл for для определения, может ли текущая goroutine войти в состояние вращения.

Вращение — это механизм синхронизации нескольких потоков, также известный как активное ожидание. Когда поток не удерживает блокировку, он не напрямую приостанавливает и переключает контекст потока, но входит в холостое вращение. В течение этого процесса он непрерывно занимает кванты времени CPU. Если конкуренция за блокировку низкая или время удержания блокировки очень короткое, выполнение этого действительно может избежать частых переключений контекста потока и эффективно улучшить производительность. Однако это не панацея. В Go злоупотребление вращением может привести к следующим опасным последствиям:

  • Высокое использование CPU: Слишком много вращающихся goroutine потребляют大量 ресурсы CPU, особенно когда блокировка удерживается в течение длительного времени.
  • Влияет на планирование goroutine: Общее количество процессоров P ограничено. Если много вращающихся goroutine занимают P, другие goroutine, выполняющие пользовательский код, не могут быть запланированы вовремя.
  • Проблемы когерентности кэша: Характеристика активного ожидания спин-блокировок заставляет потоки неоднократно читать состояние блокировки в кэше. Если вращающиеся goroutine работают на разных ядрах и состояние блокировки не обновляется в глобальную память вовремя, goroutine читают неточные состояния блокировки. Частая синхронизация когерентности кэша также значительно снижает производительность.

Так что не все goroutine могут войти в состояние вращения. Оно должно пройти следующую строгую проверку:

go
for {
    if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
        if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
        atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
            awoke = true
        }
        runtime_doSpin()
        iter++
        old = m.state
        continue
    }
    ...
}

Условия следующие:

  1. Текущая блокировка уже удерживается и не может быть в режиме голодания. В противном случае это означает, что некоторые goroutine не могут получить блокировку в течение длительного времени, поэтому напрямую входит в поток блокировки.

  2. Входит в поток проверки runtime.sync_runtime_canSpin:

    go
     const (
    	active_spin = 4
    )
    func sync_runtime_canSpin(i int) bool {
    	if i >= active_spin || ncpu <= 1 || gomaxprocs <= sched.npidle.Load()+sched.nmspinning.Load()+1 {
    		return false
    	}
    	if p := getg().m.p.ptr(); !runqempty(p) {
    		return false
    	}
    	return true
    }
  3. Количество вращений меньше runtime.active_spin, по умолчанию 4 раза. Больше раз тратит ресурсы.

  4. Количество ядер CPU больше 1. Вращение на одноядерных системах не имеет смысла.

  5. Текущий gomaxprocs больше суммы_idle P и spinning P плюс 1, означает, что нет достаточных доступных процессоров для вращения.

  6. Локальная очередь текущего P должна быть пуста. В противном случае это означает, что есть другие пользовательские задачи для выполнения, и вращение не может продолжиться.

Если вращение разрешено, вызывает runtime.sync_runtime_doSpin для входа во вращение. На самом деле выполняет инструкцию PAUSE 30 раз.

go
const (
	active_spin_cnt = 30
)

func sync_runtime_doSpin() {
	procyield(active_spin_cnt)
}
asm
TEXT runtime·procyield(SB),NOSPLIT,$0-0
	MOVL	cycles+0(FP), AX
again:
	PAUSE
	SUBL	$1, AX
	JNZ	again
	RET

Если вращение не может продолжиться, есть только два результата: успешно получить блокировку или войти в очередь ожидания и блокироваться. Однако перед этим есть много вещей для обработки:

  1. Если не в режиме голодания, пытается приобрести блокировку:
go
new := old
if old&mutexStarving == 0 {
	new |= mutexLocked
}
  1. Если блокировка уже занята или теперь в режиме голодания, инкрементирует счётчик ожидающих goroutine:
go
if old&(mutexLocked|mutexStarving) != 0 {
	new += 1 << mutexWaiterShift
}
  1. Если текущая goroutine уже в состоянии голодания и блокировка всё ещё занята, входит в режим голодания:
go
if starving && old&mutexLocked != 0 {
	new |= mutexStarving
}
  1. Если текущая goroutine была пробуждена во время вращения, добавляет флаг mutexWoken:
go
if awoke {
	new &^= mutexWoken
}

Затем начинает пытаться обновить состояние блокировки через CAS. Если обновление не удалось, напрямую начинает следующую итерацию:

go
if atomic.CompareAndSwapInt32(&m.state, old, new) {
    ...
}else {
    ...
}

Если обновление успешно, начинает следующую проверку:

  1. Исходное состояние не режим голодания, и ни одна goroutine не занимает блокировку. Тогда текущая goroutine может напрямую удерживать блокировку, выйти из потока и продолжить выполнение пользовательского кода.

    go
    if old&(mutexLocked|mutexStarving) == 0 {
    		break
    }
  2. Попытка приобрести блокировку не удалась. Записывает время ожидания, где LIFO если true означает очередь last-in-first-out, в противном случае это FIFO first-in-first-out.

    go
    queueLifo := waitStartTime != 0
    if waitStartTime == 0 {
    	waitStartTime = runtime_nanotime()
    }
  3. Попытка приобрести семафор, входит в функцию runtime.semacquire1. Если семафор может быть приобретён, напрямую возвращается без блокировки. В противном случае вызывает runtime.gopark для приостановки текущей goroutine и ожидания освобождения семафора.

    go
    runtime_SemacquireMutex(&m.sema, queueLifo, 1)
  4. Достижение этого шага имеет две возможности: либо напрямую успешно приобрёл семафор, либо только что пробуждён из блокировки и успешно получил семафор. В любом случае семафор успешно получен. Если теперь в режиме голодания, может напрямую получить блокировку.

    go
    starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
    old = m.state
    if old&mutexStarving != 0 {
    	delta := int32(mutexLocked - 1<<mutexWaiterShift)
    	if !starving || old>>mutexWaiterShift == 1 {
    		delta -= mutexStarving
    	}
    	atomic.AddInt32(&m.state, delta)
    	break
    }
  5. Если не в режиме голодания, сбрасывает iter и перезапускает поток вращения.

    go
    awoke = true
    iter = 0

На этом поток блокировки заканчивается. Весь процесс довольно сложен, используя как метод ожидания вращения, так и метод блокирующего ожидания семафора, балансируя производительность и справедливость, подходящий для большинства сценариев конкуренции за блокировку.

Unlock

Поток разблокировки относительно намного проще. Сначала пытается быстрая разблокировка. Если new равен 0, это означает, что нет ожидающих goroutine и не в режиме голодания, т.е. разблокировка успешна, может напрямую вернуться.

go
func (m *Mutex) Unlock() {
	new := atomic.AddInt32(&m.state, -mutexLocked)
	if new != 0 {
		m.unlockSlow(new)
	}
}

В противном случае нужно войти в поток unlockSlow:

  1. Сначала проверяет, уже ли разблокировано:

    go
    if (new+mutexLocked)&mutexLocked == 0 {
    	fatal("sync: unlock of unlocked mutex")
    }
  2. Если в режиме голодания, напрямую освобождает семафор для завершения разблокировки. В режиме голодания текущая разблокирующая goroutine напрямую передаёт владение блокировкой следующей ожидающей goroutine.

    go
    if new&mutexStarving == 0 {
    	...
    } else {
    	runtime_Semrelease(&m.sema, true, 1)
    }
  3. Если не в режиме голодания, входит в нормальный поток разблокировки:

    1. Если нет goroutine в ожидании, или другие пробуждённые goroutine уже получили блокировку, или блокировка вошла в режим голодания:

      go
      if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
          return
      }
    2. В противном случае освобождает семафор для пробуждения следующей ожидающей goroutine, устанавливает текущее состояние блокировки в mutexWoken:

      go
      new = (old - 1<<mutexWaiterShift) | mutexWoken
      if atomic.CompareAndSwapInt32(&m.state, old, new) {
          runtime_Semrelease(&m.sema, false, 1)
          return
      }
      old = m.state

Наконец, поток разблокировки заканчивается.

RWMutex

Определение типа мьютекса чтения-записи RWMutex расположено в файле sync/rwmutex.go. Его реализация также основана на мьютексе.

go
type RWMutex struct {
	w           Mutex        // удерживается, если есть ожидающие писатели
	writerSem   uint32       // семафор для писателей для ожидания завершения читателей
	readerSem   uint32       // семафор для читателей для ожидания завершения писателей
	readerCount atomic.Int32 // количество ожидающих читателей
	readerWait  atomic.Int32 // количество уходящих читателей
}

Определения полей:

  • w, мьютекс. Когда goroutine-писатель удерживает этот мьютекс, другие goroutine-писатели и goroutine-читатели блокируются.
  • writerSem, семафор записи, используется для блокировки goroutine-писателей, ожидающих goroutine-читателей. Goroutine-писатель приобретает семафор, goroutine-читатель освобождает семафор.
  • readerSem, семафор чтения, используется для блокировки goroutine-читателей, ожидающих goroutine-писателей. Goroutine-читатель приобретает семафор, goroutine-писатель освобождает семафор.
  • readerCount, основное поле, вся блокировка чтения-записи зависит от него для поддержания состояния.
  • readerWait, указывает количество goroutine-читателей, которые должны ждать, когда goroutine-писатель заблокирован.

Общий принцип блокировки чтения-записи: через мьютекс сделать goroutine-писателей взаимно исключающими, через два семафора writerSem и readerSem сделать чтение-запись взаимно исключающими, чтение-чтение общим.

readerCount

Поскольку этот readerCount довольно много меняется и очень важен, он выделен для обсуждения. Он примерно обобщает следующие состояния:

  1. 0: В настоящее время нет активных goroutine-читателей или активных goroutine-писателей, в холостом состоянии.
  2. -rwmutexMaxReaders: Goroutine-писатель уже удерживает мьютекс, в настоящее время нет активных goroutine-читателей.
  3. -rwmutexMaxReaders+N: Goroutine-писатель уже удерживает блокировку записи, текущие goroutine-читатели должны блокироваться и ждать, пока goroutine-писатель освободит блокировку записи.
  4. N-rwmutexMaxReaders: Goroutine-писатель уже удерживает мьютекс, должен блокироваться и ждать, пока оставшиеся goroutine-читатели освободят блокировку чтения.
  5. N: В настоящее время имеет N активных goroutine-читателей, т.е. добавлено N блокировок чтения, нет активных goroutine-писателей.

Среди них rwmutexMaxReaders — это константное значение, его значение в два раза больше количества goroutine, которые могут блокироваться и ожидать на мьютексе, потому что половина — это goroutine-читатели, а половина — goroutine-писатели.

go
const rwmutexMaxReaders = 1 << 30

Часть всей блокировки чтения-записи, этот readerCount самый сложный. Понимание его изменений проясняет рабочий поток блокировки чтения-записи.

TryLock

Как обычно, давайте сначала посмотрим на самый простой TryLock():

go
func (rw *RWMutex) TryLock() bool {
	if !rw.w.TryLock() {
		return false
	}
	if !rw.readerCount.CompareAndSwap(0, -rwmutexMaxReaders) {
		rw.w.Unlock()
		return false
	}
	return true
}

Он начинается с попытки вызвать TryLock() мьютекса. Если не удалось, напрямую возвращается. Затем использует операцию CAS для попытки обновить значение readerCount с 0 до -rwmutexMaxReaders. 0 представляет отсутствие работающих goroutine-читателей, -rwmutexMaxReaders указывает, что goroutine-писатель уже удерживает мьютекс. Если обновление операции CAS не удалось, разблокирует мьютекс. Если успешно, возвращает true.

Lock

Далее Lock(), его реализация также очень проста.

go
func (rw *RWMutex) Lock() {
	rw.w.Lock()
	r := rw.readerCount.Add(-rwmutexMaxReaders) + rwmutexMaxReaders
	if r != 0 && rw.readerWait.Add(r) != 0 {
		runtime_SemacquireRWMutex(&rw.writerSem, false, 0)
	}
}

Сначала конкурирует с другими goroutine-писателями до удержания мьютекса, затем выполняет эту операцию: сначала атомарно вычитает -rwmutexMaxReaders, затем неатомарно добавляет rwmutexMaxReaders к новому значению:

go
r = rw.readerCount.Add(-rwmutexMaxReaders) + rwmutexMaxReaders

Разобьём на два шага для просмотра:

  1. Это уведомить другие goroutine-читатели, что goroutine-писатель теперь пытается удерживать блокировку, как уже охватывалось в разделе TryLock.

    go
    rw.readerCount.Add(-rwmutexMaxReaders)
  2. Затем добавляет rwmutexMaxReaders для получения r, что представляет количество goroutine-читателей, работающих в настоящее время.

    go
    r = rw.readerCount.Add(-rwmutexMaxReaders) + rwmutexMaxReaders

Затем проверяет, работают ли goroutine-читатели, затем добавляет r к значению readerWait. Если всё ещё не 0, это означает, что нужно ждать, пока эти goroutine-читатели закончат работу, затем входит в поток runtime_SemacquireRWMutex для попытки приобрести семафор writerSem. Этот семафор освобождается goroutine-читателями. Если может получить семафор, это означает, что goroutine-читатели закончили работу. В противном случае нужно войти в очередь блокировки для ожидания (эта часть логики семафора в основном согласуется с частью мьютекса).

UnLock

Затем UnLock(), освобождение блокировки записи.

go
func (rw *RWMutex) Unlock() {
    r := rw.readerCount.Add(rwmutexMaxReaders)
    if r >= rwmutexMaxReaders {
       fatal("sync: Unlock of unlocked RWMutex")
    }

    for i := 0; i < int(r); i++ {
       runtime_Semrelease(&rw.readerSem, false, 0)
    }

    rw.w.Unlock()
}

Его поток следующий:

  1. Как упоминалось ранее, во время блокировки readerCount обновляется до отрицательного значения. Здесь добавление rwmutexMaxReaders означает, что теперь нет работающих goroutine-писателей, и полученное значение — это количество goroutine-читателей, блокирующих и ожидающих.

    go
    r := rw.readerCount.Add(rwmutexMaxReaders)
  2. Если оно само 0 или больше 0, это означает, что блокировка записи уже была освобождена:

    go
    if r >= rwmutexMaxReaders {
    	fatal("sync: Unlock of unlocked RWMutex")
    }
  3. Освобождает семафор readerSem, пробуждает ожидающие goroutine-читатели:

    go
    for i := 0; i < int(r); i++ {
    	runtime_Semrelease(&rw.readerSem, false, 0)
    }
  4. Наконец освобождает мьютекс, пробуждает ожидающие goroutine-писатели:

    go
    rw.w.Unlock()

Освобождение блокировки записи завершено.

TryRLock

Далее, давайте посмотрим на часть блокировки чтения. Это код TryRLock:

go
func (rw *RWMutex) TryRLock() bool {
	for {
		c := rw.readerCount.Load()
		if c < 0 {
			return false
		}
		if rw.readerCount.CompareAndSwap(c, c+1) {
			return true
		}
	}
}

Он делает только две вещи:

  1. Проверяет, работают ли goroutine-писатели. Если да, блокировка не удалась.

    go
    c := rw.readerCount.Load()
    if c < 0 {
    	return false
    }
  2. Пытается инкрементировать readerCount на 1. Если обновление успешно, блокировка успешна:

    go
    if rw.readerCount.CompareAndSwap(c, c+1) {
    	return true
    }
  3. В противном случае продолжает циклическую проверку до выхода.

Видно, что зависимость readerCount здесь всё поддерживается в части блокировки записи. Это также почему блокировка записи обсуждается сначала, потому что сложная основная часть поддерживается в части блокировки записи.

RLock

Логика RLock ещё проще:

go
func (rw *RWMutex) RLock() {
	if rw.readerCount.Add(1) < 0 {
		runtime_SemacquireRWMutexR(&rw.readerSem, false, 0)
	}
}

Он пытается инкрементировать значение readerCount на 1. Если новое значение всё ещё меньше 0, это означает, что goroutine-писатели работают, затем входит в поток блокировки семафора readerSem. Текущая goroutine входит в очередь блокировки для ожидания.

RUnLock

RUnLock также одинаково прост и лёгок для понимания:

go
func (rw *RWMutex) RUnlock() {
    if r := rw.readerCount.Add(-1); r < 0 {
       rw.rUnlockSlow(r)
    }
}

func (rw *RWMutex) rUnlockSlow(r int32) {
	if r+1 == 0 || r+1 == -rwmutexMaxReaders {
		fatal("sync: RUnlock of unlocked RWMutex")
	}
	if rw.readerWait.Add(-1) == 0 {
		runtime_Semrelease(&rw.writerSem, false, 1)
	}
}

Он сначала пытается декрементировать readerCount на один, указывая, что количество активных goroutine-читателей декрементируется на один. Если полученное значение больше 0, это означает, что может напрямую освободить, потому что теперь ни одна goroutine-писатель не удерживает мьютекс. Если меньше 0, это означает, что goroutine-писатель уже удерживает мьютекс, он ждёт, пока все текущие goroutine-читатели завершат работу. Далее входит в поток runlockSlow:

  1. Если исходное значение readerCount равно 0 (блокировка холостая) или -rwmutexMaxReaders (goroutine-писатель не имеет goroutine-читателей для ожидания, т.е. блокировки чтения уже все освобождены), это означает, что в настоящее время нет активных goroutine-читателей, не нужно разблокировать:

    go
    if r+1 == 0 || r+1 == -rwmutexMaxReaders {
    	fatal("sync: RUnlock of unlocked RWMutex")
    }
  2. Если есть активные goroutine-читатели, декрементирует readerWait на один. Если текущая goroutine-читатель — последняя активная goroutine-читатель, освобождает семафор writerSem, пробуждает ожидающие goroutine-писатели:

    go
    if rw.readerWait.Add(-1) == 0 {
    	runtime_Semrelease(&rw.writerSem, false, 1)
    }

Поток освобождения блокировки чтения заканчивается.

Semaphore

Семафор внутри мьютекса — это простой целый uint32, представляющий приобретение и освобождение семафора через атомарное декрементирование на один и инкрементирование на один. Структура, ответственная за поддержание семафоров во время выполнения, — это runtime.semaRoot, чьё определение типа расположено в файле runtime/sema.go. semaRoot использует сбалансированное бинарное дерево (treap) для организации и управления семафорами. Каждый узел в дереве представляет семафор, тип узла — *sudog, который является двусвязным списком, поддерживающим очередь ожидания для соответствующих семафоров. Узлы поддерживают уникальность через *sudog.elem (адрес семафора) и поддерживают баланс дерева через поле *sudog.ticket.

go
type semaRoot struct {
	lock  mutex
	treap *sudog        // корень сбалансированного дерева уникальных ожидающих.
	nwait atomic.Uint32 // Количество ожидающих. Чтение без блокировки.
}

Дерево семафора semaRoot зависит от нижележащего мьютекса runtime.mutex для обеспечения его безопасности конкурентности.

go
var semtable semTable

// Простое число, чтобы не коррелировать с любыми пользовательскими паттернами.
const semTabSize = 251

type semTable [semTabSize]struct {
	root semaRoot
    // Используется для выравнивания памяти, улучшения производительности
	pad  [cpu.CacheLinePadSize - unsafe.Sizeof(semaRoot{})]byte
}

semaRoot хранится в глобальной semaTable во время выполнения. Это выглядит как массив фиксированной длины, используемый для хранения коллекций нескольких корневых узлов дерева семафора, но на самом деле с операционной точки зрения это на самом деле хэш-таблица. Каждый элемент в таблице содержит semaRoot и некоторые байты заполнения (pad), используемые для выравнивания памяти и избежания конкуренции строки кэша. semTabSize — это константа размера таблицы семафора, указывающая длину таблицы как 251, обычно выбирая простое число для уменьшения хэш-коллизий и улучшения эффективности хэширования.

go
func (t *semTable) rootFor(addr *uint32) *semaRoot {
	return &t[(uintptr(unsafe.Pointer(addr))>>3)%semTabSize].root
}

Метод rootFor эквивалентен хэш-функции. Он принимает указатель типа uint32 addr (т.е. адрес семафора) и возвращает указатель на структуру semaRoot, соответствующую этому адресу. Эта строка кода сначала преобразует addr в uintptr, затем сдвигает вправо на 3 бита, эквивалентно делению на 8 (потому что один байт занимает 8 бит, деление адреса указателя на 8 может отобразить его на индекс массива). Путём взятия модуля semTabSize обеспечивает, что индекс находится в пределах размера таблицы семафора. После получения semaRoot через индекс идёт к сбалансированному дереву для поиска очереди ожидания *sudog, соответствующей семафору.

Acquire

Приобретение семафора, соответствующая реализация — функция runtime.semacquire1:

go
func semacquire1(addr *uint32, lifo bool, profile semaProfileFlags, skipframes int, reason waitReason)

Она получает следующие параметры:

  • addr, адрес семафора
  • lifo, влияет на порядок dequeue сбалансированного дерева, по умолчанию FIFO, LIFO — last-in-first-out, когда время ожидания блокировки goroutine не равно 0 (по крайней мере заблокирован один раз), это true
  • profile, флаги для анализа производительности блокировки
  • skipframes, количество пропущенных кадров стека
  • reason, причина блокировки

Ниже кратко описывается весь поток приобретения семафора:

  1. Проверяет состояние goroutine. Если текущая goroutine не является goroutine, находящейся в планировании, напрямую выбрасывает исключение:

    go
    gp := getg()
    if gp != gp.m.curg {
    	throw("semacquire not on the G stack")
    }
  2. Проверяет, может ли семафор быть приобретён, и пытается приобрести семафор через неблокирующий метод. Если может приобрести, может напрямую вернуться:

    go
    for {
    	v := atomic.Load(addr)
    	if v == 0 {
    		return false
    	}
    	if atomic.Cas(addr, v, v-1) {
    		return true
    	}
    }
  3. Если не может приобрести неблокирующим способом, входит в цикл для приобретения семафора через обычные средства. Сначала получает *sudog из кэша через acquireSudog(), эта структура представляет блокирующую ожидающую goroutine:

    s := acquireSudog()
  4. Затем получает дерево семафора из глобальной таблицы:

    go
    root := semtable.rootFor(addr)
  5. Входит в цикл, блокирует дерево семафора, проверяет ещё раз, может ли семафор быть приобретён. Если нет, добавляет его в дерево семафора, затем вызывает gopark для приостановки и ожидания, пока пробуждён продолжает повторять этот процесс, циклически до получения семафора:

    go
    for {
    	lockWithRank(&root.lock, lockRankRoot)
    	root.nwait.Add(1)
    	if cansemacquire(addr) {
    		root.nwait.Add(-1)
    		unlock(&root.lock)
    		break
    	}
    	root.queue(addr, s, lifo)
    	goparkunlock(&root.lock, reason, traceBlockSync, 4+skipframes)
    	if s.ticket != 0 || cansemacquire(addr) {
    		break
    	}
    }
  6. Наконец, когда пробуждён, освобождает *sudog, возвращает его в кэш:

    go
    releaseSudog(s)

Release

Освобождение семафора, пробуждение блокирующих ожидающих goroutine. Эта функциональность реализована функцией runtime.semrelease1:

go
func semrelease1(addr *uint32, handoff bool, skipframes int)

Она получает следующие параметры:

  • addr, адрес семафора
  • handoff, указывает, следует ли напрямую переключать G, в настоящее время планируемый P, на пробуждённый G, только true в режиме голодания
  • skipframes, количество пропущенных кадров стека

Ниже кратко описывается весь процесс освобождения:

  1. Получает дерево семафора, затем инкрементирует семафор на один, указывая освобождение одного семафора:

    go
    root := semtable.rootFor(addr)
    atomic.Xadd(addr, 1)
  2. Если количество ожидающих goroutine равно 0, напрямую возвращается:

    go
    if root.nwait.Load() == 0 {
    	return
    }
  3. Блокирует дерево семафора, вторичная проверка, есть ли ожидающие goroutine:

    go
    lockWithRank(&root.lock, lockRankRoot)
    if root.nwait.Load() == 0 {
    	unlock(&root.lock)
    	return
    }
  4. Получает блокирующую ожидающую goroutine из дерева семафора, nwait декрементируется на один, затем освобождает блокировку семафора:

    go
    s, t0, tailtime := root.dequeue(addr)
    if s != nil {
    	root.nwait.Add(-1)
    }
    unlock(&root.lock)
  5. Проверяет, может ли семафор быть приобретён:

    go
    if handoff && cansemacquire(addr) {
    	s.ticket = 1
    }
  6. Функция readyWithTime напрямую использует пробуждённую goroutine G как следующую goroutine для запуска для P, т.е. модифицирует *p.runnext=g:

    go
    readyWithTime(s, 5+skipframes)
  7. Если handoff равен true, тогда goyield отвяжет текущую освобождающую семафор goroutine G от текущего M и повторно добавит в конец локальной очереди выполнения P, затем запустит новый цикл планирования, так что пробуждённая goroutine G может немедленно получить планирование:

    go
    if s.ticket == 1 && getg().m.locks == 0 {
    	goyield()
    }

Потоки приобретения и освобождения семафора — это эти. Язык Go использует семафоры не только в блокировках мьютекса. Он размещён здесь, потому что семафоры имеют наибольшую корреляцию с блокировками мьютекса. Официальные лица даже написали в комментариях:

// Асинхронный семафор для sync.Mutex.

После понимания семафоров, взгляд назад на мьютексы становится яснее.

TIP

Относительно дерева семафора semaRoot, его dequeue и enqueue включают операции самобалансировки, которые довольно сложны для реализации. Углубление в них не связано с темой этой статьи и бессмысленно, поэтому это скрыто. Если интересно, вы можете исследовать исходный код самостоятельно.

Итоги

Мьютекс sync.Mutex реализует ожидание goroutine через вращение и семафор два механизма. Вращение неблокирующее, но нуждается в строгих ограничениях, потому что оно потребляет ресурсы CPU; в то время как семафоры блокирующие и могут эффективно избежать ненужного потребления ресурсов. Для реализации более справедливого механизма конкуренции Go различает нормальный режим и режим голодания, чтобы обеспечить goroutine возможность конкурировать за блокировки более сбалансированно. По сравнению с runtime.mutex этим видом нижележащей блокировки, sync.Mutex как ориентированная на пользователя блокировка учитывает больше фактических сценариев использования во время дизайна.

Блокировка чтения-записи sync.RWMutex реализует взаимное исключение запись-запись через мьютекс sync.Mutex и на этой основе дополнительно добавляет два семафора, используемых для реализации взаимного исключения чтение-запись и общего доступа чтение-чтение, тем самым поддерживая несколько конкурентных сценариев.

Хотя реализация блокировки кажется сложной, как только вы понимаете принципы Mutex, изучение других инструментов синхронизации в стандартной библиотеке sync становится намного легче.

Golang by www.golangdev.cn edit