Skip to content

Параллелизм

Поддержка параллелизма в Go естественна — это ядро языка. Язык прост в освоении, разработчики могут создавать параллельные приложения без глубокого понимания внутренней реализации.

Горутины

Горутина (coroutine) — лёгкий поток, поток пользовательского уровня, не управляется напрямую ОС, а планировщиком Go. Переключение контекста имеет малые накладные расходы, поэтому параллелизм Go эффективен. Концепция горутин не нова, Go не первый язык с горутинами, но первый, сделавший их простыми и элегантными.

В Go создание горутины просто — ключевое слово go перед вызовом функции:

TIP

Встроенные функции с возвращаемым значением нельзя использовать после go:

go
go make([]int,10) // go discards result of make([]int, 10) (value of type []int)
go
func main() {
  go fmt.Println("hello world!")
  go hello()
  go func() {
    fmt.Println("hello world!")
  }()
}

func hello() {
  fmt.Println("hello world!")
}

Все три способа работают. Но пример ничего не выведет — горутины выполняются параллельно, система тратит время на создание, главная горутина завершается, дочерние тоже завершаются. Порядок выполнения неопределён:

go
func main() {
  fmt.Println("start")
  for i := 0; i < 10; i++ {
    go fmt.Println(i)
  }
  fmt.Println("end")
}

Возможные выводы:

start
end

Или:

start
0
1
5
3
4
6
7
end

Простейшее решение — time.Sleep:

go
func main() {
  fmt.Println("start")
  for i := 0; i < 10; i++ {
    go fmt.Println(i)
    time.Sleep(time.Millisecond)
  }
  time.Sleep(time.Millisecond)
  fmt.Println("end")
}

Вывод:

start
0
1
2
3
4
5
6
7
8
9
end

Но проблема параллелизма не решена. Факторов много: время выполнения, порядок, затраты. Если задача сложная,耗时不确定,问题重现:

go
func main() {
   fmt.Println("start")
   for i := 0; i < 10; i++ {
      go hello(i)
      time.Sleep(time.Millisecond)
   }
   time.Sleep(time.Millisecond)
   fmt.Println("end")
}

func hello(i int) {
   time.Sleep(time.Millisecond * time.Duration(rand.Intn(1000)))
   fmt.Println(i)
}

Вывод неопределён:

start
0
3
4
end

time.Sleep — не решение. Go предоставляет средства контроля:

  • channel: канал
  • WaitGroup: семафор
  • Context: контекст

WaitGroup контролирует группу горутин, Context подходит для вложенных горутин, каналы — для коммуникации. Для блокировок:

  • Mutex: мьютекс
  • RWMutex: блокировка чтения-записи

Каналы

channel — канал. Объяснение Go:

Do not communicate by sharing memory; instead, share memory by communicating.

Обмен памятью через сообщения. channel — решение коммуникации горутин, также для контроля параллелизма.

Синтаксис: ключевое слово chan представляет тип канала, должен объявить тип хранимых данных:

go
var ch chan int

Канал не инициализирован, значение nil, нельзя использовать.

Создание

Создание канала — функция make, принимает тип канала и опциональный размер буфера:

go
intCh := make(chan int)
strCh := make(chan string, 1)

После использования канал нужно закрыть функцией close:

go
func close(c chan<- Type)

Пример:

go
func main() {
  intCh := make(chan int)
  close(intCh)
}

Иногда лучше использовать defer.

Чтение/запись

Операторы для операций:

ch <-: запись в канал

<- ch: чтение из канала

<- показывает направление потока. Пример для int:

go
func main() {
  intCh := make(chan int, 1)
  defer close(intCh)
  intCh <- 114514
  fmt.Println(<-intCh)
}

Создан буферизированный канал, записано 114514, прочитано и выведено, закрыто. Чтение имеет второе возвращаемое значение — булево, успешность чтения:

go
ints, ok := <-intCh

Поток данных как очередь FIFO. Операции синхронны — в момент времени только одна горутина пишет, одна читает.

Без буфера

Буфер без буфера ёмкостью 0, не хранит данные. При записи требуется немедленное чтение, иначе блокировка. Чтение аналогично. Код вызывает deadlock:

go
func main() {
  ch := make(chan int)
  defer close(ch)
  ch <- 123
  n := <-ch
  fmt.Println(n)
}

Правильно — новая горутина для отправки:

go
func main() {
  ch := make(chan int)
  defer close(ch)
  go func() {
    ch <- 123
  }()
  n := <-ch
  fmt.Println(n)
}

С буфером

Канал с буфером как блокирующая очередь. Чтение пустого и запись полного канала блокируют. Без буфера требует немедленного приёма. С буфером данные помещаются в буфер, блокировка при заполнении. Чтение из буфера до опустошения, затем блокировка. Код работает:

go
func main() {
   ch := make(chan int, 1)
   defer close(ch)
   ch <- 123
   n := <-ch
   fmt.Println(n)
}

Но синхронное чтение/запись опасны — при пустом/полном буфере блокировка. Пример:

go
func main() {
  ch := make(chan int, 5)
  chW := make(chan struct{})
  chR := make(chan struct{})
  defer func() {
    close(ch)
    close(chW)
    close(chR)
  }()
  go func() {
    for i := 0; i < 10; i++ {
      ch <- i
      fmt.Println("запись", i)
    }
    chW <- struct{}{}
  }()
  go func() {
    for i := 0; i < 10; i++ {
      time.Sleep(time.Millisecond)
      fmt.Println("чтение", <-ch)
    }
    chR <- struct{}{}
  }()
  fmt.Println("запись завершена", <-chW)
  fmt.Println("чтение завершено", <-chR)
}

Создано 3 канала: буферизированный для коммуникации, два без буфера для синхронизации. Чтение занимает 1 мс, запись максимум 5 данных (буфер заполнен). Вывод:

запись 0
запись 1
запись 2
запись 3
запись 4
чтение 0
запись 5
чтение 1
запись 6
чтение 2
запись 7
чтение 3
запись 8
запись 9
чтение 4
запись завершена {}
чтение 5
чтение 6
чтение 7
чтение 8
чтение 9
чтение завершено {}

TIP

len возвращает количество данных в буфере, cap — размер буфера:

go
func main() {
   ch := make(chan int, 5)
   ch <- 1
   ch <- 2
   ch <- 3
   fmt.Println(len(ch), cap(ch))
}

Вывод:

3 5

Пример ожидания завершения горутины:

go
func main() {
   ch := make(chan struct{})
   defer close(ch)
   go func() {
      fmt.Println(2)
      ch <- struct{}{}
   }()
   <-ch
   fmt.Println(1)
}

Вывод:

2
1

Пример мьютекса через канал:

go
var count = 0
var lock = make(chan struct{}, 1)

func Add() {
  lock <- struct{}{}
  fmt.Println("счет", count, "сложение")
  count += 1
  <-lock
}

func Sub() {
  lock <- struct{}{}
  fmt.Println("счет", count, "вычитание")
  count -= 1
  <-lock
}

Буфер размером 1, только один элемент. При записи другими горутинами блокировка до освобождения. В момент времени только одна горутина изменяет count.

Примечания

Блокировка канала:

Чтение/запись без буфера

Синхронные операции блокируют:

go
func main() {
   intCh := make(chan int)
   defer close(intCh)
   intCh <- 1
   ints, ok := <-intCh
   fmt.Println(ints, ok)
}

Чтение пустого буфера

go
func main() {
   intCh := make(chan int, 1)
   defer close(intCh)
   ints, ok := <-intCh
   fmt.Println(ints, ok)
}

Запись полного буфера

go
func main() {
  intCh := make(chan int, 1)
  defer close(intCh)
  intCh <- 1
  intCh <- 1
}

Канал nil

go
func main() {
  var intCh chan int
  intCh <- 1
}
go
func main() {
  var intCh chan int
  fmt.Println(<-intCh)
}

Паника:

Закрытие nil канала

go
func main() {
  var intCh chan int
  close(intCh)
}

Запись в закрытый канал

go
func main() {
  intCh := make(chan int, 1)
  close(intCh)
  intCh <- 1
}

Закрытие закрытого канала

go
func main() {
  ch := make(chan int, 1)
  defer close(ch)
  go write(ch)
  fmt.Println(<-ch)
}

func write(ch chan<- int) {
  ch <- 1
  close(ch)
}

Односторонние каналы

Двусторонний канал — чтение и запись. Односторонний — только чтение или запись. Односторонние каналы ограничивают поведение, используются в параметрах и возвращаемых значениях:

go
func close(c chan<- Type)
func After(d Duration) <-chan Time

Синтаксис:

  • <-chan int — только чтение
  • chan<- string — только запись

Запись в канал только чтения не компилируется:

go
func main() {
  timeCh := time.After(time.Second)
  timeCh <- time.Now()
}

Ошибка:

invalid operation: cannot send to receive-only channel timeCh

Двусторонний канал преобразуется в односторонний, обратно нет:

go
func main() {
   ch := make(chan int, 1)
   go write(ch)
   fmt.Println(<-ch)
}

func write(ch chan<- int) {
   ch <- 1
}

TIP

chan — ссылочный тип, даже при передаче по значению ссылка остаётся той же.

for range

for range читает буферизированный канал:

go
func main() {
  ch := make(chan int, 10)
  go func() {
    for i := 0; i < 10; i++ {
      ch <- i
    }
  }()
  for n := range ch {
    fmt.Println(n)
  }
}

for range имеет одно возвращаемое значение, блокируется при пустом буфере. Вывод:

0
1
2
3
4
5
6
7
8
9
fatal error: all goroutines are asleep - deadlock!

Deadlock — подгорутина завершилась, главная ждёт. Нужно закрыть канал:

go
func main() {
   ch := make(chan int, 10)
   go func() {
      for i := 0; i < 10; i++ {
         ch <- i
      }
      close(ch)
   }()
   for n := range ch {
      fmt.Println(n)
   }
}

Чтение имеет два возвращаемых значения. for range выходит при неудачном чтении. Второе значение — успешность чтения, не закрытие канала. Закрытый буферизированный канал читается с true:

go
func main() {
  ch := make(chan int, 10)
  for i := 0; i < 5; i++ {
    ch <- i
  }
  close(ch)
  for i := 0; i < 6; i++ {
    n, ok := <-ch
    fmt.Println(n, ok)
  }
}

Вывод:

0 true
1 true
2 true
3 true
4 true
0 false

Закрытый канал не блокирует, шестое чтение — нулевое значение, okfalse.

TIP

Закрывать канал следует на стороне отправителя, не получателя.

WaitGroup

sync.WaitGroup — структура пакета sync для ожидания группы горутин. Три метода:

Add указывает количество горутин:

go
func (wg *WaitGroup) Add(delta int)

Done завершает горутины:

go
func (wg *WaitGroup) Done()

Wait ожидает завершения:

go
func (wg *WaitGroup) Wait()

WaitGroup прост в использовании. Реализация — счётчик + семафор. Add инициализирует, Done уменьшает на 1, Wait блокирует до 0. Пример:

go
func main() {
  var wait sync.WaitGroup
  wait.Add(1)
  go func() {
    fmt.Println(1)
    wait.Done()
  }()
  wait.Wait()
  fmt.Println(2)
}

Вывод:

1
2

Пример с циклом:

go
func main() {
   var mainWait sync.WaitGroup
   var wait sync.WaitGroup
   mainWait.Add(10)
   fmt.Println("start")
   for i := 0; i < 10; i++ {
      wait.Add(1)
      go func() {
         fmt.Println(i)
         wait.Done()
         mainWait.Done()
      }()
      wait.Wait()
   }
   mainWait.Wait()
   fmt.Println("end")
}

Вывод:

start
0
1
2
3
4
5
6
7
8
9
end

WaitGroup не следует копировать, передавать указатель:

go
func main() {
  var mainWait sync.WaitGroup
  mainWait.Add(1)
  hello(mainWait)
  mainWait.Wait()
  fmt.Println("end")
}
func hello(wait sync.WaitGroup) {
  fmt.Println("hello")
  wait.Done()
}

Ошибка deadlock:

hello
fatal error: all goroutines are asleep - deadlock!

TIP

Отрицательный счётчик или больше количества горутин вызывает panic.

Context

Context — решение контроля параллелизма, лучше контролирует вложенные горутины. Context — интерфейс, реализации:

  • emptyCtx
  • cancelCtx
  • timerCtx
  • valueCtx

Context

Определение интерфейса:

go
type Context interface {
   Deadline() (deadline time.Time, ok bool)
   Done() <-chan struct{}
   Err() error
   Value(key any) any
}

Deadline

Возвращает время отмены и наличие установки:

go
Deadline() (deadline time.Time, ok bool)

Done

Канал только чтения, закрывается при отмене:

go
Done() <-chan struct{}

Err

Возвращает причину закрытия:

go
Err() error

Value

Возвращает значение по ключу:

go
Value(key any) any

emptyCtx

Пустой контекст, создаётся через context.Background и context.TODO:

go
var (
  background = new(emptyCtx)
  todo       = new(emptyCtx)
)

func Background() Context {
  return background
}

func TODO() Context {
  return todo
}

emptyCtx — тип int, экземпляры имеют разные адреса, не отменяется, нет deadline, нет значений:

go
type emptyCtx int

func (*emptyCtx) Deadline() (deadline time.Time, ok bool) { return }
func (*emptyCtx) Done() <-chan struct{} { return nil }
func (*emptyCtx) Err() error { return nil }
func (*emptyCtx) Value(key any) any { return nil }

emptyCtx — верхнеуровневый контекст, родитель для других.

valueCtx

valueCtx содержит пару ключ-значение и встроенный Context:

go
type valueCtx struct {
   Context
   key, val any
}

Реализован метод Value, ищет в родителе:

go
func (c *valueCtx) Value(key any) any {
   if c.key == key {
      return c.val
   }
   return value(c.Context, key)
}

Пример:

go
var waitGroup sync.WaitGroup

func main() {
  waitGroup.Add(1)
  go Do(context.WithValue(context.Background(), 1, 2))
  waitGroup.Wait()
}

func Do(ctx context.Context) {
  ticker := time.NewTimer(time.Second)
  defer waitGroup.Done()
  for {
    select {
    case <-ctx.Done():
    case <-ticker.C:
      fmt.Println("timeout")
      return
    default:
      fmt.Println(ctx.Value(1))
    }
    time.Sleep(time.Millisecond * 100)
  }
}

Вывод:

2
2
2
2
2
2
2
2
2
2
timeout

cancelCtx

cancelCtx и timerCtx реализуют canceler:

go
type canceler interface {
  cancel(removeFromParent bool, err error)
  Done() <-chan struct{}
}

cancel не экспортируется, упаковывается в WithCancel:

go
func WithCancel(parent Context) (ctx Context, cancel CancelFunc) {
   if parent == nil {
      panic("cannot create context from nil parent")
   }
   c := newCancelCtx(parent)
   propagateCancel(parent, &c)
   return &c, func() { c.cancel(true, Canceled) }
}

cancelCtx — отменяемый контекст. При создании добавляется в children родителя. При вызове cancelFunc канал Done закрывается, дочерние отменяются, удаляется из родителя. Пример:

go
var waitGroup sync.WaitGroup

func main() {
  bkg := context.Background()
  cancelCtx, cancel := context.WithCancel(bkg)
  waitGroup.Add(1)
  go func(ctx context.Context) {
    defer waitGroup.Done()
    for {
      select {
      case <-ctx.Done():
        fmt.Println(ctx.Err())
        return
      default:
        fmt.Println("ожидание отмены...")
      }
      time.Sleep(time.Millisecond * 200)
    }
  }(cancelCtx)
  time.Sleep(time.Second)
  cancel()
  waitGroup.Wait()
}

Вывод:

ожидание отмены...
ожидание отмены...
ожидание отмены...
ожидание отмены...
ожидание отмены...
context canceled

Вложенный пример:

go
var waitGroup sync.WaitGroup

func main() {
   waitGroup.Add(3)
   ctx, cancelFunc := context.WithCancel(context.Background())
   go HttpHandler(ctx)
   time.Sleep(time.Second)
   cancelFunc()
   waitGroup.Wait()
}

func HttpHandler(ctx context.Context) {
   cancelCtxAuth, cancelAuth := context.WithCancel(ctx)
   cancelCtxMail, cancelMail := context.WithCancel(ctx)
   defer cancelAuth()
   defer cancelMail()
   defer waitGroup.Done()
   go AuthService(cancelCtxAuth)
   go MailService(cancelCtxMail)
   for {
      select {
      case <-ctx.Done():
         fmt.Println(ctx.Err())
         return
      default:
         fmt.Println("обработка http...")
      }
      time.Sleep(time.Millisecond * 200)
   }
}

func AuthService(ctx context.Context) {
   defer waitGroup.Done()
   for {
      select {
      case <-ctx.Done():
         fmt.Println("auth отмена", ctx.Err())
         return
      default:
         fmt.Println("auth...")
      }
      time.Sleep(time.Millisecond * 200)
   }
}

func MailService(ctx context.Context) {
   defer waitGroup.Done()
   for {
      select {
      case <-ctx.Done():
         fmt.Println("mail отмена", ctx.Err())
         return
      default:
         fmt.Println("mail...")
      }
      time.Sleep(time.Millisecond * 200)
   }
}

Вывод:

обработка http...
auth...
mail...
mail...
auth...
обработка http...
auth...
mail...
обработка http...
обработка http...
auth...
mail...
auth...
обработка http...
mail...
context canceled
auth отмена context canceled
mail отмена context canceled

timerCtx

timerCtx добавляет тайм-аут к cancelCtx. Функции создания: WithDeadline и WithTimeout:

go
func WithDeadline(parent Context, d time.Time) (Context, CancelFunc)
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc)

timerCtx отменяется по времени, закрывает timer. Пример:

go
var wait sync.WaitGroup

func main() {
  deadline, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second))
  defer cancel()
  wait.Add(1)
  go func(ctx context.Context) {
    defer wait.Done()
    for {
      select {
      case <-ctx.Done():
        fmt.Println("отмена", ctx.Err())
        return
      default:
        fmt.Println("ожидание...")
      }
      time.Sleep(time.Millisecond * 200)
    }
  }(deadline)
  wait.Wait()
}

Вывод:

ожидание...
ожидание...
ожидание...
ожидание...
ожидание...
отмена context deadline exceeded

WithTimeout вызывает WithDeadline:

go
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc) {
   return WithDeadline(parent, time.Now().Add(timeout))
}

TIP

Контекст — ресурс, утечка при отсутствии отмены.

Select

select — решение мультиплексирования каналов. В момент времени проверяется доступность каналов. Синтаксис как switch:

go
func main() {
  chA := make(chan int)
  chB := make(chan int)
  chC := make(chan int)
  defer func() {
    close(chA)
    close(chB)
    close(chC)
  }()
  select {
  case n, ok := <-chA:
    fmt.Println(n, ok)
  case n, ok := <-chB:
    fmt.Println(n, ok)
  case n, ok := <-chC:
    fmt.Println(n, ok)
  default:
    fmt.Println("все каналы недоступны")
  }
}

Использование

select состоит из case и опционального default. Каждый case оперирует одним каналом, чтение или запись. При нескольких доступных case выбирается псевдослучайно. При недоступности всех выполняется default, иначе блокировка. Пример с записью:

go
func main() {
   chA := make(chan int)
   chB := make(chan int)
   chC := make(chan int)
   defer func() {
      close(chA)
      close(chB)
      close(chC)
   }()
   go func() {
      chA <- 1
   }()
   select {
   case n, ok := <-chA:
      fmt.Println(n, ok)
   case n, ok := <-chB:
      fmt.Println(n, ok)
   case n, ok := <-chC:
      fmt.Println(n, ok)
   }
}

Для постоянного мониторинга с for:

go
func main() {
  chA := make(chan int)
  chB := make(chan int)
  chC := make(chan int)
  defer func() {
    close(chA)
    close(chB)
    close(chC)
  }()
  go Send(chA)
  go Send(chB)
  go Send(chC)
  for {
    select {
    case n, ok := <-chA:
      fmt.Println("A", n, ok)
    case n, ok := <-chB:
      fmt.Println("B", n, ok)
    case n, ok := <-chC:
      fmt.Println("C", n, ok)
    }
  }
}

func Send(ch chan<- int) {
  for i := 0; i < 3; i++ {
    time.Sleep(time.Millisecond)
    ch <- i
  }
}

С тайм-аутом:

go
func main() {
  chA := make(chan int)
  chB := make(chan int)
  chC := make(chan int)
  defer func() {
    close(chA)
    close(chB)
    close(chC)
  }()
  l := make(chan struct{})
  go Send(chA)
  go Send(chB)
  go Send(chC)
  go func() {
  Loop:
    for {
      select {
      case n, ok := <-chA:
        fmt.Println("A", n, ok)
      case n, ok := <-chB:
        fmt.Println("B", n, ok)
      case n, ok := <-chC:
        fmt.Println("C", n, ok)
      case <-time.After(time.Second):
        break Loop
      }
    }
    l <- struct{}{}
  }()
  <-l
}

func Send(ch chan<- int) {
  for i := 0; i < 3; i++ {
    time.Sleep(time.Millisecond)
    ch <- i
  }
}

Вывод:

C 0 true
A 0 true
B 0 true
A 1 true
B 1 true
C 1 true
B 2 true
C 2 true
A 2 true

Тайм-аут

time.After возвращает канал только чтения:

go
func main() {
  chA := make(chan int)
  defer close(chA)
  go func() {
    time.Sleep(time.Second * 2)
    chA <- 1
  }()
  select {
  case n := <-chA:
    fmt.Println(n)
  case <-time.After(time.Second):
    fmt.Println("тайм-аут")
  }
}

Блокировка

Пустой select блокирует:

go
func main() {
  fmt.Println("start")
  select {}
  fmt.Println("end")
}

TIP

Операции с nil каналом в case игнорируются:

go
func main() {
   var nilCh chan int
   select {
   case <-nilCh:
      fmt.Println("read")
   case nilCh <- 1:
      fmt.Println("write")
   case <-time.After(time.Second):
      fmt.Println("timeout")
   }
}

Неблокирующий

default в select для неблокирующих операций:

go
func TrySend(ch chan int, ele int) bool  {
	select {
	case ch <- ele:
		return true
	default:
		return false
	}
}

func TryRecv(ch chan int) (int, bool)  {
	select {
	case ele, ok := <-ch:
		return ele, ok
	default:
		return 0, false
	}
}

Проверка завершения context:

go
func IsDone(ctx context.Context) bool {
	select {
	case <-ctx.Done():
		return true
	default:
		return false
	}
}

Блокировки

Пример:

go
var wait sync.WaitGroup
var count = 0

func main() {
   wait.Add(10)
   for i := 0; i < 10; i++ {
      go func(data *int) {
         time.Sleep(time.Millisecond * time.Duration(rand.Intn(5000)))
         temp := *data
         time.Sleep(time.Millisecond * time.Duration(rand.Intn(5000)))
         ans := 1
         *data = temp + ans
         fmt.Println(*data)
         wait.Done()
      }(&count)
   }
   wait.Wait()
   fmt.Println("результат", count)
}

10 горутин выполняют +1, результат не 10:

1
2
3
3
2
2
3
3
3
4
результат 4

Горутина A читает count=1, тратит 400 мс, B обновляет count, A перезаписывает. Нужны блокировки.

sync.Mutex и RWMutex предоставляют API: Lock() и Unlock(). Блокировки нерекурсивны, повторное использование вызывает fatal. Блокировка защищает инварианты:

go
func DoSomething() {
  Lock()
  Unlock()
}

Мьютекс

sync.Mutex реализует sync.Locker:

go
type Locker interface {
   Lock()
   Unlock()
}

Пример:

go
var wait sync.WaitGroup
var count = 0
var lock sync.Mutex

func main() {
  wait.Add(10)
  for i := 0; i < 10; i++ {
    go func(data *int) {
      lock.Lock()
      time.Sleep(time.Millisecond * time.Duration(rand.Intn(1000)))
      temp := *data
      time.Sleep(time.Millisecond * time.Duration(rand.Intn(1000)))
      ans := 1
      *data = temp + ans
      lock.Unlock()
      fmt.Println(*data)
      wait.Done()
    }(&count)
  }
  wait.Wait()
  fmt.Println("результат", count)
}

Вывод:

1
2
3
4
5
6
7
8
9
10
результат 10

Блокировка чтения-записи

RWMutex для частого чтения, редкой записи:

  • Чтение: другие записи блокируются, чтения нет
  • Запись: другие записи и чтения блокируются

sync.RWMutex реализует Locker:

go
func (rw *RWMutex) RLock()
func (rw *RWMutex) TryRLock() bool
func (rw *RWMutex) RUnlock()
func (rw *RWMutex) Lock()
func (rw *RWMutex) TryLock() bool
func (rw *RWMutex) Unlock()

TryRLock и TryLock неблокирующие. Пример:

go
var wait sync.WaitGroup
var count = 0
var rw sync.RWMutex

func main() {
  wait.Add(12)
  go func() {
    for i := 0; i < 3; i++ {
      go Write(&count)
    }
    wait.Done()
  }()
  go func() {
    for i := 0; i < 7; i++ {
      go Read(&count)
    }
    wait.Done()
  }()
  wait.Wait()
  fmt.Println("результат", count)
}

func Read(i *int) {
  time.Sleep(time.Millisecond * time.Duration(rand.Intn(500)))
  rw.RLock()
  fmt.Println("чтение")
  time.Sleep(time.Millisecond * time.Duration(rand.Intn(1000)))
  fmt.Println("освобождение", *i)
  rw.RUnlock()
  wait.Done()
}

func Write(i *int) {
  time.Sleep(time.Millisecond * time.Duration(rand.Intn(1000)))
  rw.Lock()
  fmt.Println("запись")
  temp := *i
  time.Sleep(time.Millisecond * time.Duration(rand.Intn(1000)))
  *i = temp + 1
  fmt.Println("освобождение", *i)
  rw.Unlock()
  wait.Done()
}

Вывод:

чтение
чтение
чтение
чтение
освобождение 0
освобождение 0
освобождение 0
освобождение 0
запись
освобождение 1
чтение
чтение
чтение
освобождение 1
освобождение 1
освобождение 1
запись
освобождение 2
запись
освобождение 3
результат 3

TIP

Блокировки передавать указателем.

Условные переменные

sync.Cond — механизм коммуникации:

go
func NewCond(l Locker) *Cond

Методы:

go
func (c *Cond) Wait()
func (c *Cond) Signal()
func (c *Cond) Broadcast()

Пример:

go
var wait sync.WaitGroup
var count = 0
var rw sync.RWMutex
var cond = sync.NewCond(rw.RLocker())

func main() {
  wait.Add(12)
  go func() {
    for i := 0; i < 3; i++ {
      go Write(&count)
    }
    wait.Done()
  }()
  go func() {
    for i := 0; i < 7; i++ {
      go Read(&count)
    }
    wait.Done()
  }()
  wait.Wait()
  fmt.Println("результат", count)
}

func Read(i *int) {
  time.Sleep(time.Millisecond * time.Duration(rand.Intn(500)))
  rw.RLock()
  fmt.Println("чтение")
  for *i < 3 {
    cond.Wait()
  }
  time.Sleep(time.Millisecond * time.Duration(rand.Intn(1000)))
  fmt.Println("освобождение", *i)
  rw.RUnlock()
  wait.Done()
}

func Write(i *int) {
  time.Sleep(time.Millisecond * time.Duration(rand.Intn(1000)))
  rw.Lock()
  fmt.Println("запись")
  temp := *i
  time.Sleep(time.Millisecond * time.Duration(rand.Intn(1000)))
  *i = temp + 1
  fmt.Println("освобождение", *i)
  rw.Unlock()
  cond.Broadcast()
  wait.Done()
}

Вывод:

чтение
чтение
чтение
чтение
запись
освобождение 1
чтение
запись
освобождение 2
чтение
чтение
запись
освобождение 3
освобождение 3
освобождение 3
освобождение 3
освобождение 3
освобождение 3
освобождение 3
результат 3

TIP

Использовать for, не if:

go
for !condition {
  cond.Wait()
}

sync

Пакет sync предоставляет инструменты параллелизма.

Once

sync.Once гарантирует однократное выполнение:

go
func (o *Once) Do(f func())

Пример:

go
var wait sync.WaitGroup

func main() {
  var slice MySlice
  wait.Add(4)
  for i := 0; i < 4; i++ {
    go func() {
      slice.Add(1)
      wait.Done()
    }()
  }
  wait.Wait()
  fmt.Println(slice.Len())
}

type MySlice struct {
  s []int
  o sync.Once
}

func (m *MySlice) Add(i int) {
  m.o.Do(func() {
    fmt.Println("инициализация")
    if m.s == nil {
      m.s = make([]int, 0, 10)
    }
  })
  m.s = append(m.s, i)
}

func (m *MySlice) Len() int {
  return len(m.s)
}

Вывод:

инициализация
4

Реализация — блокировка + атомарные операции:

go
type Once struct {
  done uint32
  m    Mutex
}

func (o *Once) Do(f func()) {
  if atomic.LoadUint32(&o.done) == 0 {
    o.doSlow(f)
  }
}

func (o *Once) doSlow(f func()) {
  o.m.Lock()
  defer o.m.Unlock()
  if o.done == 0 {
    defer atomic.StoreUint32(&o.done, 1)
    f()
  }
}

Pool

sync.Pool — пул временных объектов:

go
func (p *Pool) Get() any
func (p *Pool) Put(x any)

Поле New:

go
New func() any

Пример:

go
var wait sync.WaitGroup
var pool sync.Pool
var numOfObject atomic.Int64

type BigMemData struct {
   M string
}

func main() {
   pool.New = func() any {
      numOfObject.Add(1)
      return BigMemData{"большая память"}
   }
   wait.Add(1000)
   for i := 0; i < 1000; i++ {
      go func() {
         val := pool.Get()
         _ = val.(BigMemData)
         pool.Put(val)
         wait.Done()
      }()
   }
   wait.Wait()
   fmt.Println(numOfObject.Load())
}

Вывод:

5

Примечания:

  • Временные объекты: могут быть удалены GC
  • Непредсказуемо: неизвестно, новый или reused объект
  • Потокобезопасность: New должен быть потокобезопасным

TIP

Освобождать объекты в пул.

Пример в fmt.Fprintf:

go
func Fprintf(w io.Writer, format string, a ...any) (n int, err error) {
   p := newPrinter()
   p.doPrintf(format, a)
   n, err = w.Write(p.buf)
   p.free()
   return
}

Map

sync.Map — потокобезопасный map:

go
func (m *Map) Load(key any) (value any, ok bool)
func (m *Map) Store(key, value any)
func (m *Map) Delete(key any)
func (m *Map) LoadOrStore(key, value any) (actual any, loaded bool)
func (m *Map) LoadAndDelete(key any) (value any, loaded bool)
func (m *Map) Range(f func(key, value any) bool)

Пример:

go
func main() {
  var syncMap sync.Map
  syncMap.Store("a", 1)
  syncMap.Store("a", "a")
  fmt.Println(syncMap.Load("a"))
  fmt.Println(syncMap.LoadAndDelete("a"))
  fmt.Println(syncMap.LoadOrStore("a", "hello world"))
  syncMap.Store("b", "goodbye world")
  syncMap.Range(func(key, value any) bool {
    fmt.Println(key, value)
    return true
  })
}

Вывод:

a true
a true
hello world false
a hello world
b goodbye world

Пример параллельного использования:

go
func main() {
  var syncMap sync.Map
  var wait sync.WaitGroup
  wait.Add(10)
  for i := 0; i < 10; i++ {
    go func(n int) {
      for i := 0; i < 100; i++ {
        syncMap.Store(n, n)
      }
      wait.Done()
    }(i)
  }
  wait.Wait()
  syncMap.Range(func(key, value any) bool {
    fmt.Println(key, value)
    return true
  })
}

Вывод:

8 8
3 3
1 1
9 9
6 6
5 5
7 7
0 0
2 2
4 4

Производительность sync.Map ниже в 10-100 раз.

Атомарные операции

Атомарные операции не делятся, не прерываются.

Типы

Пакет sync/atomic предоставляет:

go
atomic.Bool{}
atomic.Pointer[]{}
atomic.Int32{}
atomic.Int64{}
atomic.Uint32{}
atomic.Uint64{}
atomic.Uintptr{}
atomic.Value{}

TIP

Реализация на ассемблере plan9.

Использование

Методы:

  • Load(): получение значения
  • Swap(newVal type) (old type): обмен
  • Store(val type): хранение

Пример:

go
func main() {
  var aint64 atomic.Uint64
  aint64.Store(64)
  aint64.Swap(128)
  aint64.Add(112)
  fmt.Println(aint64.Load())
}

Или функции:

go
func main() {
   var aint64 int64
   atomic.StoreInt64(&aint64, 64)
   atomic.SwapInt64(&aint64, 128)
   atomic.AddInt64(&aint64, 112)
   fmt.Println(atomic.LoadInt64(&aint64))
}

Вывод:

240

CAS

CompareAndSwap — основа оптимистичных блокировок:

go
var count int64

func Add(num int64) {
  for {
    expect := atomic.LoadInt64(&count)
    if atomic.CompareAndSwapInt64(&count, expect, expect+num) {
      break
    }
  }
}

Сигнатура:

go
func CompareAndSwapInt64(addr *int64, old, new int64) (swapped bool)

TIP

Проблема ABA требует version.

Value

atomic.Value хранит любой тип:

go
type Value struct {
   v any
}

Нельзя хранить nil, типы должны совпадать:

go
func main() {
   var val atomic.Value
   val.Store(nil)
}
// panic: sync/atomic: store of nil value into Value
go
func main() {
   var val atomic.Value
   val.Store("hello world")
   val.Store(114514)
}
// panic: sync/atomic: store of inconsistently typed value into Value

Атомарные типы не копировать, использовать указатели.

Golang by www.golangdev.cn edit