Параллелизм
Поддержка параллелизма в Go естественна — это ядро языка. Язык прост в освоении, разработчики могут создавать параллельные приложения без глубокого понимания внутренней реализации.
Горутины
Горутина (coroutine) — лёгкий поток, поток пользовательского уровня, не управляется напрямую ОС, а планировщиком Go. Переключение контекста имеет малые накладные расходы, поэтому параллелизм Go эффективен. Концепция горутин не нова, Go не первый язык с горутинами, но первый, сделавший их простыми и элегантными.
В Go создание горутины просто — ключевое слово go перед вызовом функции:
TIP
Встроенные функции с возвращаемым значением нельзя использовать после go:
go make([]int,10) // go discards result of make([]int, 10) (value of type []int)func main() {
go fmt.Println("hello world!")
go hello()
go func() {
fmt.Println("hello world!")
}()
}
func hello() {
fmt.Println("hello world!")
}Все три способа работают. Но пример ничего не выведет — горутины выполняются параллельно, система тратит время на создание, главная горутина завершается, дочерние тоже завершаются. Порядок выполнения неопределён:
func main() {
fmt.Println("start")
for i := 0; i < 10; i++ {
go fmt.Println(i)
}
fmt.Println("end")
}Возможные выводы:
start
endИли:
start
0
1
5
3
4
6
7
endПростейшее решение — time.Sleep:
func main() {
fmt.Println("start")
for i := 0; i < 10; i++ {
go fmt.Println(i)
time.Sleep(time.Millisecond)
}
time.Sleep(time.Millisecond)
fmt.Println("end")
}Вывод:
start
0
1
2
3
4
5
6
7
8
9
endНо проблема параллелизма не решена. Факторов много: время выполнения, порядок, затраты. Если задача сложная,耗时不确定,问题重现:
func main() {
fmt.Println("start")
for i := 0; i < 10; i++ {
go hello(i)
time.Sleep(time.Millisecond)
}
time.Sleep(time.Millisecond)
fmt.Println("end")
}
func hello(i int) {
time.Sleep(time.Millisecond * time.Duration(rand.Intn(1000)))
fmt.Println(i)
}Вывод неопределён:
start
0
3
4
endtime.Sleep — не решение. Go предоставляет средства контроля:
channel: каналWaitGroup: семафорContext: контекст
WaitGroup контролирует группу горутин, Context подходит для вложенных горутин, каналы — для коммуникации. Для блокировок:
Mutex: мьютексRWMutex: блокировка чтения-записи
Каналы
channel — канал. Объяснение Go:
Do not communicate by sharing memory; instead, share memory by communicating.
Обмен памятью через сообщения. channel — решение коммуникации горутин, также для контроля параллелизма.
Синтаксис: ключевое слово chan представляет тип канала, должен объявить тип хранимых данных:
var ch chan intКанал не инициализирован, значение nil, нельзя использовать.
Создание
Создание канала — функция make, принимает тип канала и опциональный размер буфера:
intCh := make(chan int)
strCh := make(chan string, 1)После использования канал нужно закрыть функцией close:
func close(c chan<- Type)Пример:
func main() {
intCh := make(chan int)
close(intCh)
}Иногда лучше использовать defer.
Чтение/запись
Операторы для операций:
ch <-: запись в канал
<- ch: чтение из канала
<- показывает направление потока. Пример для int:
func main() {
intCh := make(chan int, 1)
defer close(intCh)
intCh <- 114514
fmt.Println(<-intCh)
}Создан буферизированный канал, записано 114514, прочитано и выведено, закрыто. Чтение имеет второе возвращаемое значение — булево, успешность чтения:
ints, ok := <-intChПоток данных как очередь FIFO. Операции синхронны — в момент времени только одна горутина пишет, одна читает.
Без буфера
Буфер без буфера ёмкостью 0, не хранит данные. При записи требуется немедленное чтение, иначе блокировка. Чтение аналогично. Код вызывает deadlock:
func main() {
ch := make(chan int)
defer close(ch)
ch <- 123
n := <-ch
fmt.Println(n)
}Правильно — новая горутина для отправки:
func main() {
ch := make(chan int)
defer close(ch)
go func() {
ch <- 123
}()
n := <-ch
fmt.Println(n)
}С буфером
Канал с буфером как блокирующая очередь. Чтение пустого и запись полного канала блокируют. Без буфера требует немедленного приёма. С буфером данные помещаются в буфер, блокировка при заполнении. Чтение из буфера до опустошения, затем блокировка. Код работает:
func main() {
ch := make(chan int, 1)
defer close(ch)
ch <- 123
n := <-ch
fmt.Println(n)
}Но синхронное чтение/запись опасны — при пустом/полном буфере блокировка. Пример:
func main() {
ch := make(chan int, 5)
chW := make(chan struct{})
chR := make(chan struct{})
defer func() {
close(ch)
close(chW)
close(chR)
}()
go func() {
for i := 0; i < 10; i++ {
ch <- i
fmt.Println("запись", i)
}
chW <- struct{}{}
}()
go func() {
for i := 0; i < 10; i++ {
time.Sleep(time.Millisecond)
fmt.Println("чтение", <-ch)
}
chR <- struct{}{}
}()
fmt.Println("запись завершена", <-chW)
fmt.Println("чтение завершено", <-chR)
}Создано 3 канала: буферизированный для коммуникации, два без буфера для синхронизации. Чтение занимает 1 мс, запись максимум 5 данных (буфер заполнен). Вывод:
запись 0
запись 1
запись 2
запись 3
запись 4
чтение 0
запись 5
чтение 1
запись 6
чтение 2
запись 7
чтение 3
запись 8
запись 9
чтение 4
запись завершена {}
чтение 5
чтение 6
чтение 7
чтение 8
чтение 9
чтение завершено {}TIP
len возвращает количество данных в буфере, cap — размер буфера:
func main() {
ch := make(chan int, 5)
ch <- 1
ch <- 2
ch <- 3
fmt.Println(len(ch), cap(ch))
}Вывод:
3 5Пример ожидания завершения горутины:
func main() {
ch := make(chan struct{})
defer close(ch)
go func() {
fmt.Println(2)
ch <- struct{}{}
}()
<-ch
fmt.Println(1)
}Вывод:
2
1Пример мьютекса через канал:
var count = 0
var lock = make(chan struct{}, 1)
func Add() {
lock <- struct{}{}
fmt.Println("счет", count, "сложение")
count += 1
<-lock
}
func Sub() {
lock <- struct{}{}
fmt.Println("счет", count, "вычитание")
count -= 1
<-lock
}Буфер размером 1, только один элемент. При записи другими горутинами блокировка до освобождения. В момент времени только одна горутина изменяет count.
Примечания
Блокировка канала:
Чтение/запись без буфера
Синхронные операции блокируют:
func main() {
intCh := make(chan int)
defer close(intCh)
intCh <- 1
ints, ok := <-intCh
fmt.Println(ints, ok)
}Чтение пустого буфера
func main() {
intCh := make(chan int, 1)
defer close(intCh)
ints, ok := <-intCh
fmt.Println(ints, ok)
}Запись полного буфера
func main() {
intCh := make(chan int, 1)
defer close(intCh)
intCh <- 1
intCh <- 1
}Канал nil
func main() {
var intCh chan int
intCh <- 1
}func main() {
var intCh chan int
fmt.Println(<-intCh)
}Паника:
Закрытие nil канала
func main() {
var intCh chan int
close(intCh)
}Запись в закрытый канал
func main() {
intCh := make(chan int, 1)
close(intCh)
intCh <- 1
}Закрытие закрытого канала
func main() {
ch := make(chan int, 1)
defer close(ch)
go write(ch)
fmt.Println(<-ch)
}
func write(ch chan<- int) {
ch <- 1
close(ch)
}Односторонние каналы
Двусторонний канал — чтение и запись. Односторонний — только чтение или запись. Односторонние каналы ограничивают поведение, используются в параметрах и возвращаемых значениях:
func close(c chan<- Type)
func After(d Duration) <-chan TimeСинтаксис:
<-chan int— только чтениеchan<- string— только запись
Запись в канал только чтения не компилируется:
func main() {
timeCh := time.After(time.Second)
timeCh <- time.Now()
}Ошибка:
invalid operation: cannot send to receive-only channel timeChДвусторонний канал преобразуется в односторонний, обратно нет:
func main() {
ch := make(chan int, 1)
go write(ch)
fmt.Println(<-ch)
}
func write(ch chan<- int) {
ch <- 1
}TIP
chan — ссылочный тип, даже при передаче по значению ссылка остаётся той же.
for range
for range читает буферизированный канал:
func main() {
ch := make(chan int, 10)
go func() {
for i := 0; i < 10; i++ {
ch <- i
}
}()
for n := range ch {
fmt.Println(n)
}
}for range имеет одно возвращаемое значение, блокируется при пустом буфере. Вывод:
0
1
2
3
4
5
6
7
8
9
fatal error: all goroutines are asleep - deadlock!Deadlock — подгорутина завершилась, главная ждёт. Нужно закрыть канал:
func main() {
ch := make(chan int, 10)
go func() {
for i := 0; i < 10; i++ {
ch <- i
}
close(ch)
}()
for n := range ch {
fmt.Println(n)
}
}Чтение имеет два возвращаемых значения. for range выходит при неудачном чтении. Второе значение — успешность чтения, не закрытие канала. Закрытый буферизированный канал читается с true:
func main() {
ch := make(chan int, 10)
for i := 0; i < 5; i++ {
ch <- i
}
close(ch)
for i := 0; i < 6; i++ {
n, ok := <-ch
fmt.Println(n, ok)
}
}Вывод:
0 true
1 true
2 true
3 true
4 true
0 falseЗакрытый канал не блокирует, шестое чтение — нулевое значение, ok — false.
TIP
Закрывать канал следует на стороне отправителя, не получателя.
WaitGroup
sync.WaitGroup — структура пакета sync для ожидания группы горутин. Три метода:
Add указывает количество горутин:
func (wg *WaitGroup) Add(delta int)Done завершает горутины:
func (wg *WaitGroup) Done()Wait ожидает завершения:
func (wg *WaitGroup) Wait()WaitGroup прост в использовании. Реализация — счётчик + семафор. Add инициализирует, Done уменьшает на 1, Wait блокирует до 0. Пример:
func main() {
var wait sync.WaitGroup
wait.Add(1)
go func() {
fmt.Println(1)
wait.Done()
}()
wait.Wait()
fmt.Println(2)
}Вывод:
1
2Пример с циклом:
func main() {
var mainWait sync.WaitGroup
var wait sync.WaitGroup
mainWait.Add(10)
fmt.Println("start")
for i := 0; i < 10; i++ {
wait.Add(1)
go func() {
fmt.Println(i)
wait.Done()
mainWait.Done()
}()
wait.Wait()
}
mainWait.Wait()
fmt.Println("end")
}Вывод:
start
0
1
2
3
4
5
6
7
8
9
endWaitGroup не следует копировать, передавать указатель:
func main() {
var mainWait sync.WaitGroup
mainWait.Add(1)
hello(mainWait)
mainWait.Wait()
fmt.Println("end")
}
func hello(wait sync.WaitGroup) {
fmt.Println("hello")
wait.Done()
}Ошибка deadlock:
hello
fatal error: all goroutines are asleep - deadlock!TIP
Отрицательный счётчик или больше количества горутин вызывает panic.
Context
Context — решение контроля параллелизма, лучше контролирует вложенные горутины. Context — интерфейс, реализации:
emptyCtxcancelCtxtimerCtxvalueCtx
Context
Определение интерфейса:
type Context interface {
Deadline() (deadline time.Time, ok bool)
Done() <-chan struct{}
Err() error
Value(key any) any
}Deadline
Возвращает время отмены и наличие установки:
Deadline() (deadline time.Time, ok bool)Done
Канал только чтения, закрывается при отмене:
Done() <-chan struct{}Err
Возвращает причину закрытия:
Err() errorValue
Возвращает значение по ключу:
Value(key any) anyemptyCtx
Пустой контекст, создаётся через context.Background и context.TODO:
var (
background = new(emptyCtx)
todo = new(emptyCtx)
)
func Background() Context {
return background
}
func TODO() Context {
return todo
}emptyCtx — тип int, экземпляры имеют разные адреса, не отменяется, нет deadline, нет значений:
type emptyCtx int
func (*emptyCtx) Deadline() (deadline time.Time, ok bool) { return }
func (*emptyCtx) Done() <-chan struct{} { return nil }
func (*emptyCtx) Err() error { return nil }
func (*emptyCtx) Value(key any) any { return nil }emptyCtx — верхнеуровневый контекст, родитель для других.
valueCtx
valueCtx содержит пару ключ-значение и встроенный Context:
type valueCtx struct {
Context
key, val any
}Реализован метод Value, ищет в родителе:
func (c *valueCtx) Value(key any) any {
if c.key == key {
return c.val
}
return value(c.Context, key)
}Пример:
var waitGroup sync.WaitGroup
func main() {
waitGroup.Add(1)
go Do(context.WithValue(context.Background(), 1, 2))
waitGroup.Wait()
}
func Do(ctx context.Context) {
ticker := time.NewTimer(time.Second)
defer waitGroup.Done()
for {
select {
case <-ctx.Done():
case <-ticker.C:
fmt.Println("timeout")
return
default:
fmt.Println(ctx.Value(1))
}
time.Sleep(time.Millisecond * 100)
}
}Вывод:
2
2
2
2
2
2
2
2
2
2
timeoutcancelCtx
cancelCtx и timerCtx реализуют canceler:
type canceler interface {
cancel(removeFromParent bool, err error)
Done() <-chan struct{}
}cancel не экспортируется, упаковывается в WithCancel:
func WithCancel(parent Context) (ctx Context, cancel CancelFunc) {
if parent == nil {
panic("cannot create context from nil parent")
}
c := newCancelCtx(parent)
propagateCancel(parent, &c)
return &c, func() { c.cancel(true, Canceled) }
}cancelCtx — отменяемый контекст. При создании добавляется в children родителя. При вызове cancelFunc канал Done закрывается, дочерние отменяются, удаляется из родителя. Пример:
var waitGroup sync.WaitGroup
func main() {
bkg := context.Background()
cancelCtx, cancel := context.WithCancel(bkg)
waitGroup.Add(1)
go func(ctx context.Context) {
defer waitGroup.Done()
for {
select {
case <-ctx.Done():
fmt.Println(ctx.Err())
return
default:
fmt.Println("ожидание отмены...")
}
time.Sleep(time.Millisecond * 200)
}
}(cancelCtx)
time.Sleep(time.Second)
cancel()
waitGroup.Wait()
}Вывод:
ожидание отмены...
ожидание отмены...
ожидание отмены...
ожидание отмены...
ожидание отмены...
context canceledВложенный пример:
var waitGroup sync.WaitGroup
func main() {
waitGroup.Add(3)
ctx, cancelFunc := context.WithCancel(context.Background())
go HttpHandler(ctx)
time.Sleep(time.Second)
cancelFunc()
waitGroup.Wait()
}
func HttpHandler(ctx context.Context) {
cancelCtxAuth, cancelAuth := context.WithCancel(ctx)
cancelCtxMail, cancelMail := context.WithCancel(ctx)
defer cancelAuth()
defer cancelMail()
defer waitGroup.Done()
go AuthService(cancelCtxAuth)
go MailService(cancelCtxMail)
for {
select {
case <-ctx.Done():
fmt.Println(ctx.Err())
return
default:
fmt.Println("обработка http...")
}
time.Sleep(time.Millisecond * 200)
}
}
func AuthService(ctx context.Context) {
defer waitGroup.Done()
for {
select {
case <-ctx.Done():
fmt.Println("auth отмена", ctx.Err())
return
default:
fmt.Println("auth...")
}
time.Sleep(time.Millisecond * 200)
}
}
func MailService(ctx context.Context) {
defer waitGroup.Done()
for {
select {
case <-ctx.Done():
fmt.Println("mail отмена", ctx.Err())
return
default:
fmt.Println("mail...")
}
time.Sleep(time.Millisecond * 200)
}
}Вывод:
обработка http...
auth...
mail...
mail...
auth...
обработка http...
auth...
mail...
обработка http...
обработка http...
auth...
mail...
auth...
обработка http...
mail...
context canceled
auth отмена context canceled
mail отмена context canceledtimerCtx
timerCtx добавляет тайм-аут к cancelCtx. Функции создания: WithDeadline и WithTimeout:
func WithDeadline(parent Context, d time.Time) (Context, CancelFunc)
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc)timerCtx отменяется по времени, закрывает timer. Пример:
var wait sync.WaitGroup
func main() {
deadline, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second))
defer cancel()
wait.Add(1)
go func(ctx context.Context) {
defer wait.Done()
for {
select {
case <-ctx.Done():
fmt.Println("отмена", ctx.Err())
return
default:
fmt.Println("ожидание...")
}
time.Sleep(time.Millisecond * 200)
}
}(deadline)
wait.Wait()
}Вывод:
ожидание...
ожидание...
ожидание...
ожидание...
ожидание...
отмена context deadline exceededWithTimeout вызывает WithDeadline:
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc) {
return WithDeadline(parent, time.Now().Add(timeout))
}TIP
Контекст — ресурс, утечка при отсутствии отмены.
Select
select — решение мультиплексирования каналов. В момент времени проверяется доступность каналов. Синтаксис как switch:
func main() {
chA := make(chan int)
chB := make(chan int)
chC := make(chan int)
defer func() {
close(chA)
close(chB)
close(chC)
}()
select {
case n, ok := <-chA:
fmt.Println(n, ok)
case n, ok := <-chB:
fmt.Println(n, ok)
case n, ok := <-chC:
fmt.Println(n, ok)
default:
fmt.Println("все каналы недоступны")
}
}Использование
select состоит из case и опционального default. Каждый case оперирует одним каналом, чтение или запись. При нескольких доступных case выбирается псевдослучайно. При недоступности всех выполняется default, иначе блокировка. Пример с записью:
func main() {
chA := make(chan int)
chB := make(chan int)
chC := make(chan int)
defer func() {
close(chA)
close(chB)
close(chC)
}()
go func() {
chA <- 1
}()
select {
case n, ok := <-chA:
fmt.Println(n, ok)
case n, ok := <-chB:
fmt.Println(n, ok)
case n, ok := <-chC:
fmt.Println(n, ok)
}
}Для постоянного мониторинга с for:
func main() {
chA := make(chan int)
chB := make(chan int)
chC := make(chan int)
defer func() {
close(chA)
close(chB)
close(chC)
}()
go Send(chA)
go Send(chB)
go Send(chC)
for {
select {
case n, ok := <-chA:
fmt.Println("A", n, ok)
case n, ok := <-chB:
fmt.Println("B", n, ok)
case n, ok := <-chC:
fmt.Println("C", n, ok)
}
}
}
func Send(ch chan<- int) {
for i := 0; i < 3; i++ {
time.Sleep(time.Millisecond)
ch <- i
}
}С тайм-аутом:
func main() {
chA := make(chan int)
chB := make(chan int)
chC := make(chan int)
defer func() {
close(chA)
close(chB)
close(chC)
}()
l := make(chan struct{})
go Send(chA)
go Send(chB)
go Send(chC)
go func() {
Loop:
for {
select {
case n, ok := <-chA:
fmt.Println("A", n, ok)
case n, ok := <-chB:
fmt.Println("B", n, ok)
case n, ok := <-chC:
fmt.Println("C", n, ok)
case <-time.After(time.Second):
break Loop
}
}
l <- struct{}{}
}()
<-l
}
func Send(ch chan<- int) {
for i := 0; i < 3; i++ {
time.Sleep(time.Millisecond)
ch <- i
}
}Вывод:
C 0 true
A 0 true
B 0 true
A 1 true
B 1 true
C 1 true
B 2 true
C 2 true
A 2 trueТайм-аут
time.After возвращает канал только чтения:
func main() {
chA := make(chan int)
defer close(chA)
go func() {
time.Sleep(time.Second * 2)
chA <- 1
}()
select {
case n := <-chA:
fmt.Println(n)
case <-time.After(time.Second):
fmt.Println("тайм-аут")
}
}Блокировка
Пустой select блокирует:
func main() {
fmt.Println("start")
select {}
fmt.Println("end")
}TIP
Операции с nil каналом в case игнорируются:
func main() {
var nilCh chan int
select {
case <-nilCh:
fmt.Println("read")
case nilCh <- 1:
fmt.Println("write")
case <-time.After(time.Second):
fmt.Println("timeout")
}
}Неблокирующий
default в select для неблокирующих операций:
func TrySend(ch chan int, ele int) bool {
select {
case ch <- ele:
return true
default:
return false
}
}
func TryRecv(ch chan int) (int, bool) {
select {
case ele, ok := <-ch:
return ele, ok
default:
return 0, false
}
}Проверка завершения context:
func IsDone(ctx context.Context) bool {
select {
case <-ctx.Done():
return true
default:
return false
}
}Блокировки
Пример:
var wait sync.WaitGroup
var count = 0
func main() {
wait.Add(10)
for i := 0; i < 10; i++ {
go func(data *int) {
time.Sleep(time.Millisecond * time.Duration(rand.Intn(5000)))
temp := *data
time.Sleep(time.Millisecond * time.Duration(rand.Intn(5000)))
ans := 1
*data = temp + ans
fmt.Println(*data)
wait.Done()
}(&count)
}
wait.Wait()
fmt.Println("результат", count)
}10 горутин выполняют +1, результат не 10:
1
2
3
3
2
2
3
3
3
4
результат 4Горутина A читает count=1, тратит 400 мс, B обновляет count, A перезаписывает. Нужны блокировки.
sync.Mutex и RWMutex предоставляют API: Lock() и Unlock(). Блокировки нерекурсивны, повторное использование вызывает fatal. Блокировка защищает инварианты:
func DoSomething() {
Lock()
Unlock()
}Мьютекс
sync.Mutex реализует sync.Locker:
type Locker interface {
Lock()
Unlock()
}Пример:
var wait sync.WaitGroup
var count = 0
var lock sync.Mutex
func main() {
wait.Add(10)
for i := 0; i < 10; i++ {
go func(data *int) {
lock.Lock()
time.Sleep(time.Millisecond * time.Duration(rand.Intn(1000)))
temp := *data
time.Sleep(time.Millisecond * time.Duration(rand.Intn(1000)))
ans := 1
*data = temp + ans
lock.Unlock()
fmt.Println(*data)
wait.Done()
}(&count)
}
wait.Wait()
fmt.Println("результат", count)
}Вывод:
1
2
3
4
5
6
7
8
9
10
результат 10Блокировка чтения-записи
RWMutex для частого чтения, редкой записи:
- Чтение: другие записи блокируются, чтения нет
- Запись: другие записи и чтения блокируются
sync.RWMutex реализует Locker:
func (rw *RWMutex) RLock()
func (rw *RWMutex) TryRLock() bool
func (rw *RWMutex) RUnlock()
func (rw *RWMutex) Lock()
func (rw *RWMutex) TryLock() bool
func (rw *RWMutex) Unlock()TryRLock и TryLock неблокирующие. Пример:
var wait sync.WaitGroup
var count = 0
var rw sync.RWMutex
func main() {
wait.Add(12)
go func() {
for i := 0; i < 3; i++ {
go Write(&count)
}
wait.Done()
}()
go func() {
for i := 0; i < 7; i++ {
go Read(&count)
}
wait.Done()
}()
wait.Wait()
fmt.Println("результат", count)
}
func Read(i *int) {
time.Sleep(time.Millisecond * time.Duration(rand.Intn(500)))
rw.RLock()
fmt.Println("чтение")
time.Sleep(time.Millisecond * time.Duration(rand.Intn(1000)))
fmt.Println("освобождение", *i)
rw.RUnlock()
wait.Done()
}
func Write(i *int) {
time.Sleep(time.Millisecond * time.Duration(rand.Intn(1000)))
rw.Lock()
fmt.Println("запись")
temp := *i
time.Sleep(time.Millisecond * time.Duration(rand.Intn(1000)))
*i = temp + 1
fmt.Println("освобождение", *i)
rw.Unlock()
wait.Done()
}Вывод:
чтение
чтение
чтение
чтение
освобождение 0
освобождение 0
освобождение 0
освобождение 0
запись
освобождение 1
чтение
чтение
чтение
освобождение 1
освобождение 1
освобождение 1
запись
освобождение 2
запись
освобождение 3
результат 3TIP
Блокировки передавать указателем.
Условные переменные
sync.Cond — механизм коммуникации:
func NewCond(l Locker) *CondМетоды:
func (c *Cond) Wait()
func (c *Cond) Signal()
func (c *Cond) Broadcast()Пример:
var wait sync.WaitGroup
var count = 0
var rw sync.RWMutex
var cond = sync.NewCond(rw.RLocker())
func main() {
wait.Add(12)
go func() {
for i := 0; i < 3; i++ {
go Write(&count)
}
wait.Done()
}()
go func() {
for i := 0; i < 7; i++ {
go Read(&count)
}
wait.Done()
}()
wait.Wait()
fmt.Println("результат", count)
}
func Read(i *int) {
time.Sleep(time.Millisecond * time.Duration(rand.Intn(500)))
rw.RLock()
fmt.Println("чтение")
for *i < 3 {
cond.Wait()
}
time.Sleep(time.Millisecond * time.Duration(rand.Intn(1000)))
fmt.Println("освобождение", *i)
rw.RUnlock()
wait.Done()
}
func Write(i *int) {
time.Sleep(time.Millisecond * time.Duration(rand.Intn(1000)))
rw.Lock()
fmt.Println("запись")
temp := *i
time.Sleep(time.Millisecond * time.Duration(rand.Intn(1000)))
*i = temp + 1
fmt.Println("освобождение", *i)
rw.Unlock()
cond.Broadcast()
wait.Done()
}Вывод:
чтение
чтение
чтение
чтение
запись
освобождение 1
чтение
запись
освобождение 2
чтение
чтение
запись
освобождение 3
освобождение 3
освобождение 3
освобождение 3
освобождение 3
освобождение 3
освобождение 3
результат 3TIP
Использовать for, не if:
for !condition {
cond.Wait()
}sync
Пакет sync предоставляет инструменты параллелизма.
Once
sync.Once гарантирует однократное выполнение:
func (o *Once) Do(f func())Пример:
var wait sync.WaitGroup
func main() {
var slice MySlice
wait.Add(4)
for i := 0; i < 4; i++ {
go func() {
slice.Add(1)
wait.Done()
}()
}
wait.Wait()
fmt.Println(slice.Len())
}
type MySlice struct {
s []int
o sync.Once
}
func (m *MySlice) Add(i int) {
m.o.Do(func() {
fmt.Println("инициализация")
if m.s == nil {
m.s = make([]int, 0, 10)
}
})
m.s = append(m.s, i)
}
func (m *MySlice) Len() int {
return len(m.s)
}Вывод:
инициализация
4Реализация — блокировка + атомарные операции:
type Once struct {
done uint32
m Mutex
}
func (o *Once) Do(f func()) {
if atomic.LoadUint32(&o.done) == 0 {
o.doSlow(f)
}
}
func (o *Once) doSlow(f func()) {
o.m.Lock()
defer o.m.Unlock()
if o.done == 0 {
defer atomic.StoreUint32(&o.done, 1)
f()
}
}Pool
sync.Pool — пул временных объектов:
func (p *Pool) Get() any
func (p *Pool) Put(x any)Поле New:
New func() anyПример:
var wait sync.WaitGroup
var pool sync.Pool
var numOfObject atomic.Int64
type BigMemData struct {
M string
}
func main() {
pool.New = func() any {
numOfObject.Add(1)
return BigMemData{"большая память"}
}
wait.Add(1000)
for i := 0; i < 1000; i++ {
go func() {
val := pool.Get()
_ = val.(BigMemData)
pool.Put(val)
wait.Done()
}()
}
wait.Wait()
fmt.Println(numOfObject.Load())
}Вывод:
5Примечания:
- Временные объекты: могут быть удалены GC
- Непредсказуемо: неизвестно, новый или reused объект
- Потокобезопасность:
Newдолжен быть потокобезопасным
TIP
Освобождать объекты в пул.
Пример в fmt.Fprintf:
func Fprintf(w io.Writer, format string, a ...any) (n int, err error) {
p := newPrinter()
p.doPrintf(format, a)
n, err = w.Write(p.buf)
p.free()
return
}Map
sync.Map — потокобезопасный map:
func (m *Map) Load(key any) (value any, ok bool)
func (m *Map) Store(key, value any)
func (m *Map) Delete(key any)
func (m *Map) LoadOrStore(key, value any) (actual any, loaded bool)
func (m *Map) LoadAndDelete(key any) (value any, loaded bool)
func (m *Map) Range(f func(key, value any) bool)Пример:
func main() {
var syncMap sync.Map
syncMap.Store("a", 1)
syncMap.Store("a", "a")
fmt.Println(syncMap.Load("a"))
fmt.Println(syncMap.LoadAndDelete("a"))
fmt.Println(syncMap.LoadOrStore("a", "hello world"))
syncMap.Store("b", "goodbye world")
syncMap.Range(func(key, value any) bool {
fmt.Println(key, value)
return true
})
}Вывод:
a true
a true
hello world false
a hello world
b goodbye worldПример параллельного использования:
func main() {
var syncMap sync.Map
var wait sync.WaitGroup
wait.Add(10)
for i := 0; i < 10; i++ {
go func(n int) {
for i := 0; i < 100; i++ {
syncMap.Store(n, n)
}
wait.Done()
}(i)
}
wait.Wait()
syncMap.Range(func(key, value any) bool {
fmt.Println(key, value)
return true
})
}Вывод:
8 8
3 3
1 1
9 9
6 6
5 5
7 7
0 0
2 2
4 4Производительность sync.Map ниже в 10-100 раз.
Атомарные операции
Атомарные операции не делятся, не прерываются.
Типы
Пакет sync/atomic предоставляет:
atomic.Bool{}
atomic.Pointer[]{}
atomic.Int32{}
atomic.Int64{}
atomic.Uint32{}
atomic.Uint64{}
atomic.Uintptr{}
atomic.Value{}TIP
Реализация на ассемблере plan9.
Использование
Методы:
Load(): получение значенияSwap(newVal type) (old type): обменStore(val type): хранение
Пример:
func main() {
var aint64 atomic.Uint64
aint64.Store(64)
aint64.Swap(128)
aint64.Add(112)
fmt.Println(aint64.Load())
}Или функции:
func main() {
var aint64 int64
atomic.StoreInt64(&aint64, 64)
atomic.SwapInt64(&aint64, 128)
atomic.AddInt64(&aint64, 112)
fmt.Println(atomic.LoadInt64(&aint64))
}Вывод:
240CAS
CompareAndSwap — основа оптимистичных блокировок:
var count int64
func Add(num int64) {
for {
expect := atomic.LoadInt64(&count)
if atomic.CompareAndSwapInt64(&count, expect, expect+num) {
break
}
}
}Сигнатура:
func CompareAndSwapInt64(addr *int64, old, new int64) (swapped bool)TIP
Проблема ABA требует version.
Value
atomic.Value хранит любой тип:
type Value struct {
v any
}Нельзя хранить nil, типы должны совпадать:
func main() {
var val atomic.Value
val.Store(nil)
}
// panic: sync/atomic: store of nil value into Valuefunc main() {
var val atomic.Value
val.Store("hello world")
val.Store(114514)
}
// panic: sync/atomic: store of inconsistently typed value into ValueАтомарные типы не копировать, использовать указатели.
