Skip to content

Đồng thời

Go hỗ trợ đồng thời một cách tự nhiên đây là cốt lõi của ngôn ngữ này độ khó khi bắt đầu tương đối thấp nhà phát triển không cần quá quan tâm đến triển khai底层 là có thể tạo ra một ứng dụng đồng thời khá tốt nâng cao giới hạn của nhà phát triển.

Goroutine

Goroutine là một loại thread nhẹ hoặc gọi là thread ở user state không chịu sự điều phối trực tiếp của hệ điều hành mà do scheduler của chính Go thực hiện điều phối runtime vì vậy chi phí chuyển đổi ngữ cảnh rất nhỏ đây cũng là một trong những lý do tại sao hiệu suất đồng thời của Go rất tốt. Khái niệm goroutine không phải do Go đề xuất lần đầu Go cũng không phải là ngôn ngữ đầu tiên hỗ trợ goroutine nhưng Go là ngôn ngữ đầu tiên có thể hỗ trợ goroutine và đồng thời một cách đơn giản và thanh lịch.

Trong Go việc tạo một goroutine rất đơn giản chỉ cần một từ khóa go là có thể nhanh chóng khởi động một goroutine sau từ khóa go phải là một lời gọi hàm. Ví dụ như sau

TIP

Hàm built-in có giá trị trả về không được phép theo sau từ khóa go ví dụ như minh họa sai dưới đây

go
go make([]int,10) //  go discards result of make([]int, 10) (value of type []int)
go
func main() {
  go fmt.Println("hello world!")
  go hello()
  go func() {
    fmt.Println("hello world!")
  }()
}

func hello() {
  fmt.Println("hello world!")
}

Ba cách khởi động goroutine trên đều được nhưng thực tế sau khi ví dụ này thực thi trong hầu hết các trường hợp sẽ không có gì được xuất ra goroutine được thực thi đồng thời hệ thống cần thời gian để tạo goroutine và trước đó goroutine chính đã chạy xong một khi thread chính thoát ra các goroutine con khác cũng tự động thoát. Và thứ tự thực thi của goroutine cũng không xác định được không thể đoán trước ví dụ như ví dụ dưới đây

go
func main() {
  fmt.Println("start")
  for i := 0; i < 10; i++ {
    go fmt.Println(i)
  }
  fmt.Println("end")
}

Đây là ví dụ khởi động goroutine trong vòng lặp không bao giờ có thể dự đoán chính xác nó sẽ xuất ra gì. Có thể goroutine con chưa bắt đầu chạy goroutine chính đã kết thúc trường hợp như sau

start
end

Hoặc chỉ có một phần goroutine con chạy thành công trước khi goroutine chính thoát trường hợp như sau

start
0
1
5
3
4
6
7
end

Cách đơn giản nhất là để goroutine chính đợi một lúc cần sử dụng hàm Sleep trong gói time có thể khiến goroutine hiện tại tạm dừng trong một khoảng thời gian ví dụ như sau

go
func main() {
  fmt.Println("start")
  for i := 0; i < 10; i++ {
    go fmt.Println(i)
  }
    // Tạm dừng 1ms
  time.Sleep(time.Millisecond)
  fmt.Println("end")
}

Thực thi lại xuất như sau có thể thấy tất cả các số đều được xuất ra đầy đủ không thiếu

start
0
1
5
2
3
4
6
8
9
7
end

Nhưng thứ tự vẫn lộn xộn vì vậy để mỗi lần lặp đợi một chút. Ví dụ như sau

go
func main() {
   fmt.Println("start")
   for i := 0; i < 10; i++ {
      go fmt.Println(i)
      time.Sleep(time.Millisecond)
   }
   time.Sleep(time.Millisecond)
   fmt.Println("end")
}

Kết quả xuất bây giờ đã là thứ tự bình thường

start
0
1
2
3
4
5
6
7
8
9
end

Kết quả xuất trong ví dụ trên rất hoàn hảo nhưng vấn đề đồng thời đã được giải quyết chưa không hoàn toàn không. Đối với chương trình đồng thời các yếu tố không kiểm soát được rất nhiều thời điểm thực thi thứ tự trước sau thời gian xử lý của quá trình thực thi v.v. nếu công việc của goroutine con trong vòng lặp không chỉ là xuất một số đơn giản mà là một tác vụ rất lớn và phức tạp thời gian không xác định thì vẫn sẽ tái hiện vấn đề trước đó. Ví dụ như mã dưới đây

go
func main() {
   fmt.Println("start")
   for i := 0; i < 10; i++ {
      go hello(i)
      time.Sleep(time.Millisecond)
   }
   time.Sleep(time.Millisecond)
   fmt.Println("end")
}

func hello(i int) {
   // Mô phỏng thời gian xử lý ngẫu nhiên
   time.Sleep(time.Millisecond * time.Duration(rand.Intn(1000)))
   fmt.Println(i)
}

Kết quả xuất của đoạn mã này vẫn không xác định dưới đây là một trong những trường hợp có thể

start
0
3
4
end

Vì vậy time.Sleep không phải là cách giải quyết tốt may mắn thay Go cung cấp rất nhiều phương tiện kiểm soát đồng thời常用的 có ba phương pháp kiểm soát đồng thời

  • channel: ống dẫn
  • WaitGroup: semaphore
  • Context: ngữ cảnh

Ba phương pháp có tình huống áp dụng khác nhau WaitGroup có thể kiểm soát động một nhóm goroutine có số lượng chỉ định Context phù hợp hơn với trường hợp层级 goroutine con cháu sâu hơn ống dẫn phù hợp hơn cho giao tiếp giữa các goroutine. Đối với kiểm soát khóa truyền thống Go cũng hỗ trợ

  • Mutex: khóa loại trừ lẫn nhau
  • RWMutex: khóa đọc ghi loại trừ lẫn nhau

Channel

channel dịch là ống dẫn Go giải thích về tác dụng của ống dẫn như sau

Do not communicate by sharing memory; instead, share memory by communicating.

Tức là chia sẻ bộ nhớ thông qua tin nhắn channel sinh ra để làm điều này nó là một giải pháp giao tiếp giữa các goroutine đồng thời cũng có thể dùng để kiểm soát đồng thời trước tiên hãy làm quen với cú pháp cơ bản của channel. Trong Go sử dụng từ khóa chan để đại diện cho loại ống dẫn đồng thời cũng phải khai báo loại lưu trữ của ống dẫn để chỉ định dữ liệu mà nó lưu trữ là loại gì ví dụ dưới đây là hình dạng của một ống dẫn thông thường.

go
var ch chan int

Đây là câu lệnh khai báo một ống dẫn lúc này ống dẫn chưa được khởi tạo giá trị của nó là nil không thể sử dụng trực tiếp.

Tạo

Khi tạo ống dẫn chỉ có một và chỉ một cách đó là sử dụng hàm built-in make đối với ống dẫn hàm make nhận hai tham số tham số đầu tiên là loại ống dẫn tham số thứ hai là tùy chọn là kích thước buffer của ống dẫn. Ví dụ như sau

go
intCh := make(chan int)
// Ống dẫn có buffer kích thước 1
strCh := make(chan string, 1)

Sau khi sử dụng xong một ống dẫn nhất định phải nhớ đóng ống dẫn đó sử dụng hàm built-in close để đóng một ống dẫn chữ ký hàm như sau.

go
func close(c chan<- Type)

Một ví dụ đóng ống dẫn như sau

go
func main() {
  intCh := make(chan int)
  // làm gì đó
  close(intCh)
}

Đôi khi sử dụng defer để đóng ống dẫn có thể tốt hơn.

Đọc ghi

Đối với một ống dẫn Go sử dụng hai toán tử rất sinh động để biểu thị thao tác đọc ghi

ch <-: biểu thị ghi dữ liệu vào một ống dẫn

<- ch: biểu thị đọc dữ liệu từ một ống dẫn

<- rất sinh động biểu thị hướng dòng chảy của dữ liệu xem một ví dụ đọc ghi ống dẫn loại int

go
func main() {
    // Nếu không có buffer sẽ dẫn đến deadlock
  intCh := make(chan int, 1)
  defer close(intCh)
    // Ghi dữ liệu
  intCh <- 114514
    // Đọc dữ liệu
  fmt.Println(<-intCh)
}

Ví dụ trên tạo một ống dẫn loại int có kích thước buffer là 1 ghi dữ liệu 114514 vào nó sau đó đọc dữ liệu và xuất ra cuối cùng đóng ống dẫn đó. Đối với thao tác đọc còn có giá trị trả về thứ hai là một giá trị kiểu bool dùng để biểu thị dữ liệu có đọc thành công hay không

go
ints, ok := <-intCh

Cách dòng chảy của dữ liệu trong ống dẫn giống như hàng đợi tức là vào trước ra trước (FIFO) thao tác của goroutine đối với ống dẫn là đồng bộ tại một thời điểm chỉ có một goroutine có thể ghi dữ liệu vào nó đồng thời cũng chỉ có một goroutine có thể đọc dữ liệu từ ống dẫn.

Không buffer

Đối với ống dẫn không buffer vì dung lượng buffer là 0 nên không thể lưu trữ tạm thời bất kỳ dữ liệu nào. Chính vì ống dẫn không buffer không thể lưu trữ dữ liệu khi ghi dữ liệu vào ống dẫn phải lập tức có goroutine khác đến đọc nếu không sẽ chặn đợi đọc dữ liệu cũng tương tự điều này giải thích tại sao mã trông có vẻ bình thường dưới đây lại xảy ra deadlock.

go
func main() {
  // Tạo ống dẫn không buffer
  ch := make(chan int)
  defer close(ch)
  // Ghi dữ liệu
  ch <- 123
  // Đọc dữ liệu
  n := <-ch
  fmt.Println(n)
}

Ống dẫn không buffer không nên sử dụng đồng bộ đúng ra nên khởi động một goroutine mới để gửi dữ liệu như ví dụ dưới

go
func main() {
  // Tạo ống dẫn không buffer
  ch := make(chan int)
  defer close(ch)
  go func() {
    // Ghi dữ liệu
    ch <- 123
  }()
  // Đọc dữ liệu
  n := <-ch
  fmt.Println(n)
}

Có buffer

Khi ống dẫn có buffer giống như một hàng đợi chặn đọc ống dẫn rỗng và ghi vào ống dẫn đã đầy sẽ gây chặn. Ống dẫn không buffer khi gửi dữ liệu phải lập tức có người nhận nếu không sẽ chặn mãi. Đối với ống dẫn có buffer thì không cần như vậy khi ghi dữ liệu vào ống dẫn có buffer sẽ đưa dữ liệu vào buffer trước chỉ khi dung lượng buffer đầy mới chặn đợi goroutine đến đọc dữ liệu từ ống dẫn. Tương tự khi đọc ống dẫn có buffer sẽ đọc từ buffer trước cho đến khi buffer hết dữ liệu mới chặn đợi goroutine đến ghi dữ liệu vào ống dẫn. Vì vậy ví dụ gây deadlock trong ống dẫn không buffer ở đây có thể chạy trơn tru.

go
func main() {
   // Tạo ống dẫn có buffer
   ch := make(chan int, 1)
   defer close(ch)
   // Ghi dữ liệu
   ch <- 123
   // Đọc dữ liệu
   n := <-ch
   fmt.Println(n)
}

Mặc dù có thể chạy trơn tru nhưng cách đọc ghi đồng bộ này rất nguy hiểm một khi buffer của ống dẫn hết hoặc đầy sẽ chặn mãi vì không có goroutine khác đến ghi hoặc đọc dữ liệu từ ống dẫn. Xem ví dụ dưới đây

go
func main() {
  // Tạo ống dẫn có buffer
  ch := make(chan int, 5)
  // Tạo hai ống dẫn không buffer
  chW := make(chan struct{})
  chR := make(chan struct{})
  defer func() {
    close(ch)
    close(chW)
    close(chR)
  }()
  // Phụ trách ghi
  go func() {
    for i := 0; i < 10; i++ {
      ch <- i
      fmt.Println("ghi", i)
    }
    chW <- struct{}{}
  }()
  // Phụ trách đọc
  go func() {
    for i := 0; i < 10; i++ {
            // Mỗi lần đọc dữ liệu đều tốn 1ms
      time.Sleep(time.Millisecond)
      fmt.Println("đọc", <-ch)
    }
    chR <- struct{}{}
  }()
  fmt.Println("ghi xong", <-chW)
  fmt.Println("đọc xong", <-chR)
}

Ở đây tổng cộng tạo 3 ống dẫn một ống dẫn có buffer dùng để giao tiếp giữa các goroutine hai ống dẫn không buffer dùng để đồng bộ thứ tự thực thi của goroutine cha con. Goroutine phụ trách đọc mỗi lần đọc đều đợi 1ms goroutine phụ trách ghi nhiều nhất chỉ có thể ghi 5 dữ liệu vì buffer của ống dẫn tối đa chỉ có 5 trước khi có goroutine đến đọc chỉ có thể chặn đợi. Vì vậy kết quả xuất của ví dụ này như sau

ghi 0
ghi 1
ghi 2
ghi 3
ghi 4 // Ghi 5 cái buffer đầy đợi goroutine khác đến đọc
đọc 0
ghi 5 // Đọc một ghi một
đọc 1
ghi 6
đọc 2
ghi 7
đọc 3
ghi 8
ghi 9
đọc 4
ghi xong {} // Tất cả dữ liệu đã gửi xong goroutine ghi thực thi xong
đọc 5
đọc 6
đọc 7
đọc 8
đọc 9
đọc xong {} // Tất cả dữ liệu đã đọc xong goroutine đọc thực thi xong

Có thể thấy goroutine phụ trách ghi lúc mới bắt đầu đã gửi 5 dữ liệu một hơi sau khi buffer đầy bắt đầu chặn đợi goroutine đọc đến đọc sau đó mỗi khi goroutine đọc đọc một dữ liệu mỗi 1ms buffer có chỗ trống goroutine ghi sẽ ghi một dữ liệu cho đến khi tất cả dữ liệu được gửi xong goroutine ghi kết thúc thực thi sau đó khi goroutine đọc đọc hết tất cả dữ liệu trong buffer goroutine đọc cũng kết thúc thực thi cuối cùng goroutine chính thoát.

TIP

Thông qua hàm built-in len có thể truy cập số lượng dữ liệu trong buffer của ống dẫn thông qua cap có thể truy cập kích thước buffer của ống dẫn.

go
func main() {
   ch := make(chan int, 5)
   ch <- 1
   ch <- 2
   ch <- 3
   fmt.Println(len(ch), cap(ch))
}

Kết quả xuất

3 5

Sử dụng điều kiện chặn của ống dẫn có thể dễ dàng viết ra ví dụ goroutine chính đợi goroutine con thực thi xong

go
func main() {
   // Tạo một ống dẫn không buffer
   ch := make(chan struct{})
   defer close(ch)
   go func() {
      fmt.Println(2)
      // Ghi
      ch <- struct{}{}
   }()
   // Chặn đợi đọc
   <-ch
   fmt.Println(1)
}

Kết quả xuất

2
1

Sử dụng ống dẫn có buffer cũng có thể thực hiện một khóa loại trừ lẫn nhau đơn giản xem ví dụ dưới đây

go
var count = 0

// Ống dẫn có buffer kích thước 1
var lock = make(chan struct{}, 1)

func Add() {
    // Khóa
  lock <- struct{}{}
  fmt.Println("đếm hiện tại là", count, "thực hiện phép cộng")
  count += 1
    // Mở khóa
  <-lock
}

func Sub() {
    // Khóa
  lock <- struct{}{}
  fmt.Println("đếm hiện tại là", count, "thực hiện phép trừ")
  count -= 1
    // Mở khóa
  <-lock
}

Vì kích thước buffer của ống dẫn là 1 nhiều nhất chỉ có một dữ liệu được lưu trữ trong buffer. Hàm AddSub trước mỗi thao tác đều cố gắng gửi dữ liệu vào ống dẫn vì kích thước buffer là 1 nếu có goroutine khác đã ghi dữ liệu buffer đã đầy goroutine hiện tại phải chặn đợi cho đến khi buffer có chỗ trống như vậy tại một thời điểm nhiều nhất chỉ có một goroutine có thể sửa đổi biến count như vậy đã thực hiện một khóa loại trừ lẫn nhau đơn giản.

Điểm cần lưu ý

Dưới đây là một số tổng hợp các trường hợp sau sử dụng không đúng sẽ dẫn đến ống dẫn chặn

Đọc ghi ống dẫn không buffer

Khi thực hiện thao tác đọc ghi đồng bộ trực tiếp trên một ống dẫn không buffer sẽ khiến goroutine hiện tại chặn

go
func main() {
   // Tạo một ống dẫn không buffer
   intCh := make(chan int)
   defer close(intCh)
   // Gửi dữ liệu
   intCh <- 1
   // Đọc dữ liệu
   ints, ok := <-intCh
   fmt.Println(ints, ok)
}

Đọc ống dẫn buffer rỗng

Khi đọc một ống dẫn có buffer rỗng sẽ khiến goroutine hiện tại chặn

go
func main() {
   // Tạo ống dẫn có buffer
   intCh := make(chan int, 1)
   defer close(intCh)
   // Buffer rỗng chặn đợi goroutine khác ghi dữ liệu
   ints, ok := <-intCh
   fmt.Println(ints, ok)
}

Ghi vào ống dẫn buffer đầy

Khi buffer của ống dẫn đã đầy ghi dữ liệu vào sẽ khiến goroutine hiện tại chặn

go
func main() {
  // Tạo ống dẫn có buffer
  intCh := make(chan int, 1)
  defer close(intCh)

  intCh <- 1
    // Đầy rồi chặn đợi goroutine khác đến đọc dữ liệu
  intCh <- 1
}

Ống dẫn là nil

Khi ống dẫn là nil bất kể đọc ghi như thế nào đều sẽ khiến goroutine hiện tại chặn

go
func main() {
  var intCh chan int
    // Ghi
  intCh <- 1
}
go
func main() {
  var intCh chan int
    // Đọc
  fmt.Println(<-intCh)
}

Về điều kiện chặn của ống dẫn cần nắm vững và quen thuộc trong hầu hết các trường hợp những vấn đề này ẩn rất kín đáo sẽ không trực quan như trong ví dụ.

Các trường hợp sau đây còn dẫn đến panic

Đóng một ống dẫn nil

Khi ống dẫn là nil sử dụng hàm close để đóng nó sẽ dẫn đến panic

go
func main() {
  var intCh chan int
  close(intCh)
}

Ghi vào ống dẫn đã đóng

Ghi dữ liệu vào một ống dẫn đã đóng sẽ dẫn đến panic

go
func main() {
  intCh := make(chan int, 1)
  close(intCh)
  intCh <- 1
}

Đóng ống dẫn đã đóng

Trong một số trường hợp ống dẫn có thể được truyền qua nhiều tầng người gọi có lẽ cũng không biết rốt cuộc nên đóng ống dẫn bởi ai như vậy có thể xảy ra việc đóng một ống dẫn đã đóng sẽ xảy ra panic.

go
func main() {
  ch := make(chan int, 1)
  defer close(ch)
  go write(ch)
  fmt.Println(<-ch)
}

func write(ch chan<- int) {
  // Chỉ có thể ghi dữ liệu vào ống dẫn
  ch <- 1
  close(ch)
}

Ống dẫn một chiều

Ống dẫn hai chiều đề cập đến việc vừa có thể ghi vừa có thể đọc tức là có thể thao tác ở hai bên ống dẫn. Ống dẫn một chiều đề cập đến ống dẫn chỉ đọc hoặc chỉ ghi tức là chỉ có thể thao tác ở một bên ống dẫn. Tự tạo một ống dẫn chỉ đọc hoặc chỉ ghi không có ý nghĩa quá lớn vì không thể đọc ghi ống dẫn sẽ mất đi tác dụng tồn tại của nó. Ống dẫn một chiều thường được dùng để hạn chế hành vi của ống dẫn thường xuất hiện trong tham số hàm và giá trị trả về ví dụ hàm built-in close dùng để đóng ống dẫn đã sử dụng ống dẫn một chiều trong chữ ký hàm.

go
func close(c chan<- Type)

Hoặc hàm After trong gói time thường dùng

go
func After(d Duration) <-chan Time

Tham số của hàm close là một ống dẫn chỉ ghi giá trị trả về của hàm After là một ống dẫn chỉ đọc vì vậy cú pháp của ống dẫn một chiều như sau

  • Dấu mũi tên <- ở trước là ống dẫn chỉ đọc như <-chan int
  • Dấu mũi tên <- ở sau là ống dẫn chỉ ghi như chan<- string

Khi cố gắng ghi dữ liệu vào ống dẫn chỉ đọc sẽ không thể biên dịch

go
func main() {
  timeCh := time.After(time.Second)
  timeCh <- time.Now()
}

Báo lỗi như sau ý rất rõ ràng

invalid operation: cannot send to receive-only channel timeCh (variable of type <-chan time.Time)

Đọc từ ống dẫn chỉ ghi cũng tương tự.

Ống dẫn hai chiều có thể chuyển đổi thành ống dẫn một chiều ngược lại thì không. Thông thường khi truyền ống dẫn hai chiều cho một goroutine hoặc hàm nào đó và không muốn nó đọc/gửi dữ liệu có thể sử dụng ống dẫn một chiều để hạn chế hành vi của phía kia.

go
func main() {
   ch := make(chan int, 1)
   go write(ch)
   fmt.Println(<-ch)
}

func write(ch chan<- int) {
   // Chỉ có thể gửi dữ liệu vào ống dẫn
   ch <- 1
}

Ống dẫn chỉ đọc cũng vậy

TIP

chan là loại tham chiếu mặc dù Go truyền tham số hàm là truyền giá trị nhưng tham chiếu vẫn là cùng một điều này sẽ được giải thích trong nguyên lý ống dẫn sau.

for range

Thông qua câu lệnh for range có thể遍历 đọc dữ liệu từ ống dẫn buffer như ví dụ dưới

go
func main() {
  ch := make(chan int, 10)
  go func() {
    for i := 0; i < 10; i++ {
      ch <- i
    }
  }()
  for n := range ch {
    fmt.Println(n)
  }
}

Thông thường for range遍历 các cấu trúc dữ liệu có thể lặp khác sẽ có hai giá trị trả về giá trị đầu tiên là chỉ số giá trị thứ hai là giá trị phần tử nhưng đối với ống dẫn chỉ có một và chỉ một giá trị trả về for range sẽ liên tục đọc phần tử trong ống dẫn khi buffer của ống dẫn rỗng hoặc không buffer sẽ chặn đợi cho đến khi có goroutine khác ghi dữ liệu vào ống dẫn mới tiếp tục đọc dữ liệu. Vì vậy kết quả xuất như sau

0
1
2
3
4
5
6
7
8
9
fatal error: all goroutines are asleep - deadlock!

Có thể thấy mã trên đã xảy ra deadlock vì goroutine con đã thực thi xong mà goroutine chính vẫn chặn đợi goroutine khác đến ghi dữ liệu vào ống dẫn vì vậy nên đóng ống dẫn sau khi ghi xong. Sửa thành mã dưới đây

go
func main() {
   ch := make(chan int, 10)
   go func() {
      for i := 0; i < 10; i++ {
         ch <- i
      }
      // Đóng ống dẫn
      close(ch)
   }()
   for n := range ch {
      fmt.Println(n)
   }
}

Sau khi ghi xong đóng ống dẫn mã trên sẽ không xảy ra deadlock nữa.前面 đã đề cập đến đọc ống dẫn có hai giá trị trả về khi for range遍历 ống dẫn khi không thể đọc dữ liệu thành công sẽ thoát khỏi vòng lặp. Giá trị trả về thứ hai đề cập đến việc có thể đọc dữ liệu thành công hay không chứ không phải ống dẫn đã đóng hay chưa mặc dù ống dẫn đã đóng đối với ống dẫn có buffer vẫn có thể đọc dữ liệu và giá trị trả về thứ hai vẫn là true. Xem ví dụ dưới đây

go
func main() {
  ch := make(chan int, 10)
  for i := 0; i < 5; i++ {
    ch <- i
  }
    // Đóng ống dẫn
  close(ch)
    // Đọc dữ liệu thêm
  for i := 0; i < 6; i++ {
    n, ok := <-ch
    fmt.Println(n, ok)
  }
}

Kết quả xuất

0 true
1 true
2 true
3 true
4 true
0 false

Vì ống dẫn đã đóng rồi cho dù buffer rỗng đọc thêm dữ liệu cũng không khiến goroutine hiện tại chặn có thể thấy ở lần遍历 thứ sáu đọc được giá trị 0 và okfalse.

TIP

Về thời điểm đóng ống dẫn nên đóng ống dẫn ở phía gửi dữ liệu vào ống dẫn chứ không nên đóng ở phía nhận vì trong hầu hết các trường hợp phía nhận chỉ biết nhận dữ liệu không biết nên đóng ống dẫn lúc nào.

WaitGroup

sync.WaitGroup là một struct được cung cấp trong gói sync WaitGroup tức là đợi thực thi sử dụng nó có thể dễ dàng thực hiện hiệu ứng đợi một nhóm goroutine. Struct này chỉ expose ba phương thức ra bên ngoài.

Phương thức Add dùng để chỉ định số lượng goroutine cần đợi

go
func (wg *WaitGroup) Add(delta int)

Phương thức Done biểu thị goroutine hiện tại đã thực thi xong

go
func (wg *WaitGroup) Done()

Phương thức Wait đợi goroutine con kết thúc nếu không sẽ chặn

go
func (wg *WaitGroup) Wait()

WaitGroup sử dụng rất đơn giản thuộc loại mở hộp là dùng. Triển khai bên trong của nó là bộ đếm + semaphore khi chương trình bắt đầu gọi Add để khởi tạo đếm mỗi khi một goroutine thực thi xong gọi Done đếm sẽ -1 cho đến khi giảm về 0 trong thời gian này goroutine chính gọi Wait sẽ chặn cho đến khi tất cả đếm giảm về 0 rồi mới được đánh thức. Xem một ví dụ sử dụng đơn giản

go
func main() {
  var wait sync.WaitGroup
  // Chỉ định số lượng goroutine con
  wait.Add(1)
  go func() {
    fmt.Println(1)
    // Thực thi xong
    wait.Done()
  }()
  // Đợi goroutine con
  wait.Wait()
  fmt.Println(2)
}

Đoạn mã này luôn xuất 1 trước rồi mới xuất 2 goroutine chính sẽ đợi goroutine con thực thi xong rồi mới thoát.

1
2

Đối với ví dụ đầu tiên trong phần giới thiệu về goroutine có thể sửa đổi như sau

go
func main() {
   var mainWait sync.WaitGroup
   var wait sync.WaitGroup
   // Đếm 10
   mainWait.Add(10)
   fmt.Println("start")
   for i := 0; i < 10; i++ {
      // Trong vòng lặp đếm 1
      wait.Add(1)
      go func() {
         fmt.Println(i)
         // Hai đếm -1
         wait.Done()
         mainWait.Done()
      }()
      // Đợi goroutine trong vòng lặp hiện tại thực thi xong
      wait.Wait()
   }
   // Đợi tất cả goroutine thực thi xong
   mainWait.Wait()
   fmt.Println("end")
}

Ở đây sử dụng sync.WaitGroup thay thế cho time.Sleep trước đó thứ tự thực thi đồng thời của goroutine có thể kiểm soát hơn bất kể thực thi bao nhiêu lần kết quả xuất đều như sau

start
0
1
2
3
4
5
6
7
8
9
end

WaitGroup thường phù hợp khi số lượng goroutine có thể điều chỉnh động ví dụ biết trước số lượng goroutine hoặc cần điều chỉnh động trong quá trình chạy. Giá trị của WaitGroup không nên được sao chép giá trị sau khi sao chép cũng không nên tiếp tục sử dụng đặc biệt khi truyền nó làm tham số hàm nên truyền con trỏ thay vì giá trị. Nếu sử dụng giá trị sao chép đếm hoàn toàn không thể tác dụng lên WaitGroup thật điều này có thể khiến goroutine chính chặn mãi chương trình sẽ không thể chạy bình thường. Ví dụ như mã dưới đây

go
func main() {
  var mainWait sync.WaitGroup
  mainWait.Add(1)
  hello(mainWait)
  mainWait.Wait()
  fmt.Println("end")
}
func hello(wait sync.WaitGroup) {
  fmt.Println("hello")
  wait.Done()
}

Thông báo lỗi tất cả goroutine đã thoát nhưng goroutine chính vẫn đang đợi như vậy đã hình thành deadlock vì gọi Done bên trong hàm hello đối với tham số hình thức WaitGroup sẽ không tác dụng lên mainWait ban đầu vì vậy nên sử dụng con trỏ để truyền.

hello
fatal error: all goroutines are asleep - deadlock!

TIP

Khi đếm trở thành số âm hoặc số lượng đếm lớn hơn số lượng goroutine con sẽ gây ra panic.

Context

Context dịch là ngữ cảnh là một giải pháp kiểm soát đồng thời mà Go cung cấp so với ống dẫn và WaitGroup nó có thể kiểm soát tốt hơn goroutine con cháu và các goroutine có层级 sâu hơn. Context bản thân là một interface chỉ cần thực hiện interface này đều có thể gọi là ngữ cảnh ví dụ gin.Context trong framework Web nổi tiếng Gin. Thư viện chuẩn context cũng cung cấp một vài triển khai分别是

  • emptyCtx
  • cancelCtx
  • timerCtx
  • valueCtx

Context

Trước tiên xem định nghĩa của interface Context rồi tìm hiểu triển khai cụ thể của nó.

go
type Context interface {

   Deadline() (deadline time.Time, ok bool)

   Done() <-chan struct{}

   Err() error

   Value(key any) any
}

Deadline

Phương thức này có hai giá trị trả về deadline là thời hạn chót tức là thời điểm ngữ cảnh nên bị hủy. Giá trị thứ hai là có đặt deadline hay không nếu không đặt thì luôn là false.

go
Deadline() (deadline time.Time, ok bool)

Done

Giá trị trả về của nó là một ống dẫn chỉ đọc loại struct rỗng ống dẫn này chỉ có tác dụng thông báo không truyền bất kỳ dữ liệu nào. Khi công việc mà ngữ cảnh làm nên bị hủy ống dẫn này sẽ bị đóng đối với một số ngữ cảnh không hỗ trợ hủy có thể trả về nil.

go
Done() <-chan struct{}

Err

Phương thức này trả về một error dùng để biểu thị lý do đóng ngữ cảnh. Khi ống dẫn Done chưa đóng trả về nil nếu sau khi đóng sẽ trả về một err để giải thích tại sao đóng.

go
Err() error

Value

Phương thức này trả về giá trị khóa tương ứng nếu key không tồn tại hoặc không hỗ trợ phương thức này sẽ trả về nil.

go
Value(key any) any

emptyCtx

Đúng như tên gọi emptyCtx là ngữ cảnh rỗng tất cả triển khai trong gói context đều không expose ra bên ngoài nhưng cung cấp các hàm tương ứng để tạo ngữ cảnh. emptyCtx có thể được tạo thông qua context.Backgroundcontext.TODO. Hai hàm như sau

go
var (
  background = new(emptyCtx)
  todo       = new(emptyCtx)
)

func Background() Context {
  return background
}

func TODO() Context {
  return todo
}

Có thể thấy chỉ đơn giản là trả về con trỏ emptyCtx. Loại底层 của emptyCtx thực tế là một int lý do không sử dụng struct rỗng là vì instance của emptyCtx phải có địa chỉ bộ nhớ khác nhau nó không thể bị hủy không có deadline cũng không thể lấy giá trị các phương thức được thực hiện đều trả về giá trị 0.

go
type emptyCtx int

func (*emptyCtx) Deadline() (deadline time.Time, ok bool) {
   return
}

func (*emptyCtx) Done() <-chan struct{} {
   return nil
}

func (*emptyCtx) Err() error {
   return nil
}

func (*emptyCtx) Value(key any) any {
   return nil
}

emptyCtx thường được dùng làm ngữ cảnh顶层 nhất khi tạo ba loại ngữ cảnh khác được truyền làm ngữ cảnh cha. Mối quan hệ giữa các triển khai trong gói context như hình dưới

valueCtx

Triển khai của valueCtx khá đơn giản bên trong chỉ chứa một cặp key-value và một trường kiểu Context nhúng.

go
type valueCtx struct {
   Context
   key, val any
}

Bản thân nó chỉ thực hiện phương thức Value logic cũng rất đơn giản không tìm thấy trong ngữ cảnh hiện tại thì đi tìm trong ngữ cảnh cha.

go
func (c *valueCtx) Value(key any) any {
   if c.key == key {
      return c.val
   }
   return value(c.Context, key)
}

Dưới đây xem một trường hợp sử dụng đơn giản của valueCtx

go
var waitGroup sync.WaitGroup

func main() {
  waitGroup.Add(1)
    // Truyền ngữ cảnh
  go Do(context.WithValue(context.Background(), 1, 2))
  waitGroup.Wait()
}

func Do(ctx context.Context) {
    // Tạo timer mới
  ticker := time.NewTimer(time.Second)
  defer waitGroup.Done()
  for {
    select {
    case <-ctx.Done(): // Mãi mãi cũng không thực thi
    case <-ticker.C:
      fmt.Println("timeout")
      return
    default:
      fmt.Println(ctx.Value(1))
    }
    time.Sleep(time.Millisecond * 100)
  }
}

valueCtx thường dùng để truyền một số dữ liệu trong nhiều cấp goroutine không thể bị hủy vì vậy ctx.Done mãi mãi sẽ trả về nil select sẽ bỏ qua ống dẫn nil. Cuối cùng kết quả xuất như sau

2
2
2
2
2
2
2
2
2
2
timeout

cancelCtx

cancelCtxtimerCtx đều thực hiện interface canceler loại interface như sau

go
type canceler interface {
    // removeFromParent biểu thị có xóa bản thân khỏi ngữ cảnh cha hay không
    // err biểu thị lý do hủy
  cancel(removeFromParent bool, err error)
    // Done trả về một ống dẫn dùng để thông báo lý do hủy
  Done() <-chan struct{}
}

Phương thức cancel không expose ra bên ngoài khi tạo ngữ cảnh được đóng gói trong giá trị trả về thông qua closure để gọi bên ngoài như trong mã nguồn context.WithCancel所示

go
func WithCancel(parent Context) (ctx Context, cancel CancelFunc) {
   if parent == nil {
      panic("cannot create context from nil parent")
   }
   c := newCancelCtx(parent)
   // Cố gắng thêm bản thân vào children của cha
   propagateCancel(parent, &c)
   // Trả về context và một hàm
   return &c, func() { c.cancel(true, Canceled) }
}

cancelCtx dịch là ngữ cảnh có thể hủy khi tạo nếu cha thực hiện canceler sẽ thêm bản thân vào children của cha nếu không sẽ tiếp tục tìm kiếm lên trên. Nếu tất cả các cha đều không thực hiện canceler sẽ khởi động một goroutine đợi cha hủy rồi khi cha kết thúc sẽ hủy ngữ cảnh hiện tại. Khi gọi cancelFunc ống dẫn Done sẽ bị đóng bất kỳ con nào của ngữ cảnh này cũng sẽ bị hủy theo cuối cùng sẽ xóa bản thân khỏi cha. Dưới đây là một ví dụ đơn giản

go
var waitGroup sync.WaitGroup

func main() {
  bkg := context.Background()
    // Trả về một cancelCtx và hàm cancel
  cancelCtx, cancel := context.WithCancel(bkg)
  waitGroup.Add(1)
  go func(ctx context.Context) {
    defer waitGroup.Done()
    for {
      select {
      case <-ctx.Done():
        fmt.Println(ctx.Err())
        return
      default:
        fmt.Println("đang đợi hủy...")
      }
      time.Sleep(time.Millisecond * 200)
    }

  }(cancelCtx)
  time.Sleep(time.Second)
  cancel()
  waitGroup.Wait()
}

Kết quả xuất như sau

đang đợi hủy...
đang đợi hủy...
đang đợi hủy...
đang đợi hủy...
đang đợi hủy...
context canceled

Thêm một ví dụ层级嵌套 sâu hơn

go
var waitGroup sync.WaitGroup

func main() {
   waitGroup.Add(3)
   ctx, cancelFunc := context.WithCancel(context.Background())
   go HttpHandler(ctx)
   time.Sleep(time.Second)
   cancelFunc()
   waitGroup.Wait()
}

func HttpHandler(ctx context.Context) {
   cancelCtxAuth, cancelAuth := context.WithCancel(ctx)
   cancelCtxMail, cancelMail := context.WithCancel(ctx)

   defer cancelAuth()
   defer cancelMail()
   defer waitGroup.Done()

   go AuthService(cancelCtxAuth)
   go MailService(cancelCtxMail)

   for {
      select {
      case <-ctx.Done():
         fmt.Println(ctx.Err())
         return
      default:
         fmt.Println("đang xử lý yêu cầu http...")
      }
      time.Sleep(time.Millisecond * 200)
   }

}

func AuthService(ctx context.Context) {
   defer waitGroup.Done()
   for {
      select {
      case <-ctx.Done():
         fmt.Println("auth cha hủy", ctx.Err())
         return
      default:
         fmt.Println("auth...")
      }
      time.Sleep(time.Millisecond * 200)
   }
}

func MailService(ctx context.Context) {
   defer waitGroup.Done()
   for {
      select {
      case <-ctx.Done():
         fmt.Println("mail cha hủy", ctx.Err())
         return
      default:
         fmt.Println("mail...")
      }
      time.Sleep(time.Millisecond * 200)
   }
}

Ví dụ này tạo 3 cancelCtx mặc dù ngữ cảnh cha cancelCtx khi hủy đồng thời sẽ hủy ngữ cảnh con của nó nhưng để an toàn nếu tạo một cancelCtx sau khi quy trình tương ứng kết thúc nên gọi hàm cancel. Kết quả xuất như sau

đang xử lý yêu cầu http...
auth...
mail...
mail...
auth...
đang xử lý yêu cầu http...
auth...
mail...
đang xử lý yêu cầu http...
đang xử lý yêu cầu http...
auth...
mail...
auth...
đang xử lý yêu cầu http...
mail...
context canceled
auth cha hủy context canceled
mail cha hủy context canceled

timerCtx

timerCtx trên cơ sở của cancelCtx thêm cơ chế timeout thư viện context cung cấp hai hàm tạo分别是 WithDeadlineWithTimeout chức năng của cả hai tương tự cái trước là chỉ định một thời gian timeout cụ thể ví dụ chỉ định một thời gian cụ thể 2023/3/20 16:32:00 cái sau là chỉ định một khoảng thời gian timeout ví dụ 5 phút sau. Chữ ký của hai hàm như sau

go
func WithDeadline(parent Context, d time.Time) (Context, CancelFunc)

func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc)

timerCtx sẽ tự động hủy ngữ cảnh hiện tại sau khi thời gian hết hạn quy trình hủy ngoài việc đóng timer thêm về cơ bản giống với cancelCtx. Dưới đây là một ví dụ sử dụng đơn giản của timerCtx

go
var wait sync.WaitGroup

func main() {
  deadline, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second))
  defer cancel()
  wait.Add(1)
  go func(ctx context.Context) {
    defer wait.Done()
    for {
      select {
      case <-ctx.Done():
        fmt.Println("ngữ cảnh hủy", ctx.Err())
        return
      default:
        fmt.Println("đang đợi hủy...")
      }
      time.Sleep(time.Millisecond * 200)
    }
  }(deadline)
  wait.Wait()
}

Mặc dù ngữ cảnh hết hạn sẽ tự động hủy nhưng để an toàn sau khi quy trình liên quan kết thúc tốt nhất nên hủy ngữ cảnh thủ công. Kết quả xuất như sau

đang đợi hủy...
đang đợi hủy...
đang đợi hủy...
đang đợi hủy...
đang đợi hủy...
ngữ cảnh hủy context deadline exceeded

WithTimeout thực tế rất giống với WithDeadline triển khai của nó cũng chỉ hơi đóng gói một chút rồi gọi WithDeadline cách sử dụng giống với ví dụ WithDeadline ở trên như sau

go
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc) {
   return WithDeadline(parent, time.Now().Add(timeout))
}

TIP

Cũng giống như phân bổ bộ nhớ mà không thu hồi sẽ gây rò rỉ bộ nhớ context cũng là một loại tài nguyên nếu tạo nhưng không bao giờ hủy cũng sẽ gây rò rỉ context vì vậy tốt nhất nên tránh tình huống này.

Select

select trong hệ thống Linux là một giải pháp IO multiplexing tương tự trong Go select là một cấu trúc kiểm soát pipe multiplexing. Multiplexing là gì nói đơn giản bằng một câu tại một thời điểm đồng thời giám sát nhiều phần tử có sẵn hay không phần tử được giám sát có thể là yêu cầu mạng IO file v.v. Phần tử mà select trong Go giám sát chính là ống dẫn và chỉ có thể là ống dẫn. Cú pháp của select giống với câu lệnh switch dưới đây xem một câu lệnh select trông như thế nào

go
func main() {
  // Tạo ba ống dẫn
  chA := make(chan int)
  chB := make(chan int)
  chC := make(chan int)
  defer func() {
    close(chA)
    close(chB)
    close(chC)
  }()
  select {
  case n, ok := <-chA:
    fmt.Println(n, ok)
  case n, ok := <-chB:
    fmt.Println(n, ok)
  case n, ok := <-chC:
    fmt.Println(n, ok)
  default:
    fmt.Println("tất cả ống dẫn đều không khả dụng")
  }
}

Sử dụng

Giống với switch select gồm nhiều case và một default nhánh default có thể bỏ qua. Mỗi case chỉ có thể thao tác một ống dẫn và chỉ có thể thực hiện một thao tác hoặc đọc hoặc ghi khi có nhiều case khả dụng select sẽ giả ngẫu nhiên chọn một case để thực thi. Nếu tất cả case đều không khả dụng sẽ thực thi nhánh default nếu không có nhánh default sẽ chặn đợi cho đến khi ít nhất một case khả dụng. Vì trong ví dụ trên không ghi dữ liệu vào ống dẫn tự nhiên tất cả case đều không khả dụng vì vậy cuối cùng xuất kết quả thực thi của nhánh default. Sửa đổi một chút như sau

go
func main() {
   chA := make(chan int)
   chB := make(chan int)
   chC := make(chan int)
   defer func() {
      close(chA)
      close(chB)
      close(chC)
   }()
   // Khởi động một goroutine mới
   go func() {
      // Ghi dữ liệu vào ống dẫn A
      chA <- 1
   }()
   select {
   case n, ok := <-chA:
      fmt.Println(n, ok)
   case n, ok := <-chB:
      fmt.Println(n, ok)
   case n, ok := <-chC:
      fmt.Println(n, ok)
   }
}

Ví dụ trên khởi động một goroutine mới để ghi dữ liệu vào ống dẫn A select vì không có nhánh mặc định nên sẽ chặn đợi cho đến khi có case khả dụng. Khi ống dẫn A khả dụng sau khi thực thi xong nhánh tương ứng goroutine chính sẽ thoát trực tiếp. Muốn luôn giám sát ống dẫn có thể phối hợp sử dụng với vòng lặp for như sau.

go
func main() {
  chA := make(chan int)
  chB := make(chan int)
  chC := make(chan int)
  defer func() {
    close(chA)
    close(chB)
    close(chC)
  }()
  go Send(chA)
  go Send(chB)
  go Send(chC)
  // Vòng lặp for
  for {
    select {
    case n, ok := <-chA:
      fmt.Println("A", n, ok)
    case n, ok := <-chB:
      fmt.Println("B", n, ok)
    case n, ok := <-chC:
      fmt.Println("C", n, ok)
    }
  }
}

func Send(ch chan<- int) {
  for i := 0; i < 3; i++ {
    time.Sleep(time.Millisecond)
    ch <- i
  }
}

Như vậy thực sự cả ba ống dẫn đều có thể sử dụng được nhưng vòng lặp vô tận + select sẽ khiến goroutine chính chặn vĩnh viễn vì vậy có thể đặt nó vào goroutine mới và thêm một số logic khác.

go
func main() {
  chA := make(chan int)
  chB := make(chan int)
  chC := make(chan int)
  defer func() {
    close(chA)
    close(chB)
    close(chC)
  }()

  l := make(chan struct{})

  go Send(chA)
  go Send(chB)
  go Send(chC)

  go func() {
  Loop:
    for {
      select {
      case n, ok := <-chA:
        fmt.Println("A", n, ok)
      case n, ok := <-chB:
        fmt.Println("B", n, ok)
      case n, ok := <-chC:
        fmt.Println("C", n, ok)
      case <-time.After(time.Second): // Đặt thời gian timeout 1 giây
        break Loop // Thoát khỏi vòng lặp
      }
    }
    l <- struct{}{} // Báo cho goroutine chính có thể thoát
  }()

  <-l
}

func Send(ch chan<- int) {
  for i := 0; i < 3; i++ {
    time.Sleep(time.Millisecond)
    ch <- i
  }
}

Ví dụ trên thông qua vòng lặp for phối hợp với select để luôn giám sát ba ống dẫn có khả dụng hay không và case thứ tư là một ống dẫn timeout sau khi timeout sẽ thoát khỏi vòng lặp kết thúc goroutine con. Cuối cùng kết quả xuất như sau

C 0 true
A 0 true
B 0 true
A 1 true
B 1 true
C 1 true
B 2 true
C 2 true
A 2 true

Timeout

Ví dụ trước sử dụng hàm time.After giá trị trả về của nó là một ống dẫn chỉ đọc hàm này phối hợp với select sử dụng có thể rất đơn giản thực hiện cơ chế timeout ví dụ như sau

go
func main() {
  chA := make(chan int)
  defer close(chA)
  go func() {
    time.Sleep(time.Second * 2)
    chA <- 1
  }()
  select {
  case n := <-chA:
    fmt.Println(n)
  case <-time.After(time.Second):
    fmt.Println("timeout")
  }
}

Chặn vĩnh viễn

Khi trong câu lệnh select không có gì sẽ chặn vĩnh viễn ví dụ

go
func main() {
  fmt.Println("start")
  select {}
  fmt.Println("end")
}

end mãi mãi cũng không được xuất goroutine chính sẽ chặn mãi trường hợp này thường có mục đích đặc biệt.

TIP

Trong case của select thao tác trên ống dẫn có giá trị nil sẽ không dẫn đến chặn case đó sẽ bị bỏ qua mãi mãi không được thực thi. Ví dụ mã dưới đây bất kể thực thi bao nhiêu lần cũng chỉ xuất timeout.

go
func main() {
   var nilCh chan int
   select {
   case <-nilCh:
      fmt.Println("read")
   case nilCh <- 1:
      fmt.Println("write")
   case <-time.After(time.Second):
      fmt.Println("timeout")
   }
}

Không chặn

Thông qua việc sử dụng nhánh default của select phối hợp với ống dẫn chúng ta có thể thực hiện thao tác gửi nhận không chặn như dưới đây

go
func TrySend(ch chan int, ele int) bool  {
	select {
	case ch <- ele:
		return true
	default:
		return false
	}
}

func TryRecv(ch chan int) (int, bool)  {
	select {
	case ele, ok := <-ch:
		return ele, ok
	default:
		return 0, false
	}
}

Tương tự cũng có thể thực hiện判断 không chặn một context có kết thúc hay không

go
func IsDone(ctx context.Context) bool {
	select {
	case <-ctx.Done():
		return true
	default:
		return false
	}
}

Khóa

Trước tiên xem một ví dụ

go
var wait sync.WaitGroup
var count = 0

func main() {
   wait.Add(10)
   for i := 0; i < 10; i++ {
      go func(data *int) {
         // Mô phỏng thời gian truy cập
         time.Sleep(time.Millisecond * time.Duration(rand.Intn(5000)))
         // Truy cập dữ liệu
         temp := *data
         // Mô phỏng thời gian tính toán
         time.Sleep(time.Millisecond * time.Duration(rand.Intn(5000)))
         ans := 1
         // Sửa đổi dữ liệu
         *data = temp + ans
         fmt.Println(*data)
         wait.Done()
      }(&count)
   }
   wait.Wait()
   fmt.Println("kết quả cuối cùng", count)
}

Đối với ví dụ trên khởi động mười goroutine để thực hiện thao tác +1 trên count và sử dụng time.Sleep để mô phỏng thời gian xử lý khác nhau theo trực giác mà nói 10 goroutine thực thi 10 thao tác +1 kết quả cuối cùng chắc chắn là 10 kết quả đúng thực sự cũng là 10 nhưng thực tế không phải vậy kết quả thực thi của ví dụ trên như sau

1
2
3
3
2
2
3
3
3
4
kết quả cuối cùng 4

Có thể thấy kết quả cuối cùng là 4 và đây chỉ là một trong nhiều kết quả có thể. Do thời gian truy cập và tính toán của mỗi goroutine khác nhau goroutine A truy cập dữ liệu tốn 500ms lúc này truy cập到的 giá trị count là 1 sau đó lại tốn 400ms để tính toán nhưng trong 400ms này goroutine B đã hoàn thành truy cập và tính toán và thành công cập nhật giá trị count goroutine A sau khi tính toán xong giá trị mà goroutine A truy cập ban đầu đã lỗi thời nhưng goroutine A không biết điều này vẫn cộng thêm một trên cơ sở giá trị truy cập ban đầu và gán cho count như vậy kết quả thực thi của goroutine B bị ghi đè. Khi nhiều goroutine đọc và truy cập một dữ liệu chia sẻ đặc biệt sẽ xảy ra vấn đề như vậy vì vậy cần sử dụng khóa.

MutexRWMutex trong gói sync của Go cung cấp hai triển khai khóa loại trừ lẫn nhau và khóa đọc ghi và cung cấp API rất đơn giản dễ sử dụng khóa chỉ cần Lock() mở khóa cũng chỉ cần Unlock(). Cần lưu ý khóa mà Go cung cấp đều là khóa không đệ quy tức là khóa không thể vào lại vì vậy khóa lại hoặc mở khóa lại đều sẽ dẫn đến fatal. Ý nghĩa của khóa là bảo vệ bất biến khóa là hy vọng dữ liệu không bị goroutine khác sửa đổi như sau

go
func DoSomething() {
  Lock()
    // Trong quá trình này dữ liệu không bị goroutine khác sửa đổi
  Unlock()
}

Nếu là khóa đệ quy có thể xảy ra tình huống như sau

go
func DoSomething() {
  Lock()
    DoOther()
  Unlock()
}

func DoOther() {
  Lock()
  // làm gì đó
  Unlock()
}

Hàm DoSomthing rõ ràng không biết hàm DoOther có thể làm gì với dữ liệu từ đó sửa đổi dữ liệu ví dụ mở thêm vài goroutine con phá vỡ bất biến. Điều này không ổn trong Go một khi đã khóa phải đảm bảo tính bất biến của bất biến lúc này khóa lại mở khóa lại đều sẽ dẫn đến deadlock. Vì vậy khi viết mã nên tránh tình huống trên cần thiết đồng thời sử dụng câu lệnh defer để mở khóa ngay khi khóa.

Khóa loại trừ lẫn nhau

sync.Mutex là triển khai khóa loại trừ lẫn nhau mà Go cung cấp nó thực hiện interface sync.Locker

go
type Locker interface {
   // Khóa
   Lock()
   // Mở khóa
   Unlock()
}

Sử dụng khóa loại trừ lẫn nhau có thể giải quyết hoàn hảo vấn đề trên ví dụ như sau

go
var wait sync.WaitGroup
var count = 0

var lock sync.Mutex

func main() {
  wait.Add(10)
  for i := 0; i < 10; i++ {
    go func(data *int) {
      // Khóa
      lock.Lock()
      // Mô phỏng thời gian truy cập
      time.Sleep(time.Millisecond * time.Duration(rand.Intn(1000)))
      // Truy cập dữ liệu
      temp := *data
      // Mô phỏng thời gian tính toán
      time.Sleep(time.Millisecond * time.Duration(rand.Intn(1000)))
      ans := 1
      // Sửa đổi dữ liệu
      *data = temp + ans
      // Mở khóa
      lock.Unlock()
      fmt.Println(*data)
      wait.Done()
    }(&count)
  }
  wait.Wait()
  fmt.Println("kết quả cuối cùng", count)
}

Mỗi goroutine trước khi truy cập dữ liệu đều khóa trước sau khi cập nhật xong mở khóa goroutine khác muốn truy cập phải giành được khóa trước nếu không sẽ chặn đợi. Như vậy sẽ không tồn tại vấn đề trên vì vậy kết quả xuất như sau

1
2
3
4
5
6
7
8
9
10
kết quả cuối cùng 10

Khóa đọc ghi

Khóa loại trừ lẫn nhau phù hợp với trường hợp tần suất đọc và viết đều tương đương đối với một số dữ liệu đọc nhiều viết ít nếu sử dụng khóa loại trừ lẫn nhau sẽ gây ra cạnh tranh khóa không cần thiết cho nhiều goroutine điều này sẽ tiêu tốn nhiều tài nguyên hệ thống lúc này cần sử dụng khóa đọc ghi tức là khóa đọc ghi loại trừ lẫn nhau đối với một goroutine

  • Nếu giành được khóa đọc goroutine khác thực hiện thao tác viết sẽ chặn goroutine khác thực hiện thao tác đọc sẽ không chặn
  • Nếu giành được khóa viết goroutine khác thực hiện thao tác viết sẽ chặn goroutine khác thực hiện thao tác đọc sẽ chặn

Triển khai khóa đọc ghi loại trừ lẫn nhau trong Go là sync.RWMutex nó cũng thực hiện interface Locker nhưng cung cấp nhiều phương thức khả dụng hơn như sau

go
// Khóa đọc
func (rw *RWMutex) RLock()

// Cố gắng khóa đọc
func (rw *RWMutex) TryRLock() bool

// Mở khóa đọc
func (rw *RWMutex) RUnlock()

// Khóa viết
func (rw *RWMutex) Lock()

// Cố gắng khóa viết
func (rw *RWMutex) TryLock() bool

// Mở khóa viết
func (rw *RWMutex) Unlock()

Trong đó hai thao tác cố gắng khóa TryRlockTryLock là không chặn thành công khóa sẽ trả về true khi không thể giành được khóa sẽ không chặn mà trả về false. Triển khai bên trong của khóa đọc ghi vẫn là khóa loại trừ lẫn nhau không phải nói có khóa đọc và khóa viết là có hai khóa từ đầu đến cuối chỉ có một khóa. Dưới đây xem một trường hợp sử dụng khóa đọc ghi loại trừ lẫn nhau

go
var wait sync.WaitGroup
var count = 0

var rw sync.RWMutex

func main() {
  wait.Add(12)
  // Đọc nhiều viết ít
  go func() {
    for i := 0; i < 3; i++ {
      go Write(&count)
    }
    wait.Done()
  }()
  go func() {
    for i := 0; i < 7; i++ {
      go Read(&count)
    }
    wait.Done()
  }()
  // Đợi goroutine con kết thúc
  wait.Wait()
  fmt.Println("kết quả cuối cùng", count)
}

func Read(i *int) {
  time.Sleep(time.Millisecond * time.Duration(rand.Intn(500)))
  rw.RLock()
  fmt.Println("giành được khóa đọc")
  time.Sleep(time.Millisecond * time.Duration(rand.Intn(1000)))
  fmt.Println("giải phóng khóa đọc", *i)
  rw.RUnlock()
  wait.Done()
}

func Write(i *int) {
  time.Sleep(time.Millisecond * time.Duration(rand.Intn(1000)))
  rw.Lock()
  fmt.Println("giành được khóa viết")
  temp := *i
  time.Sleep(time.Millisecond * time.Duration(rand.Intn(1000)))
  *i = temp + 1
  fmt.Println("giải phóng khóa viết", *i)
  rw.Unlock()
  wait.Done()
}

Ví dụ này khởi động 3 goroutine viết 7 goroutine đọc khi đọc dữ liệu đều giành được khóa đọc trước goroutine đọc có thể bình thường giành được khóa đọc nhưng sẽ chặn goroutine viết khi giành được khóa viết thì sẽ đồng thời chặn goroutine đọc và goroutine viết cho đến khi giải phóng khóa viết như vậy thực hiện goroutine đọc và goroutine viết loại trừ lẫn nhau đảm bảo tính chính xác của dữ liệu. Kết quả xuất của ví dụ như sau

giành được khóa đọc
giành được khóa đọc
giành được khóa đọc
giành được khóa đọc
giải phóng khóa đọc 0
giải phóng khóa đọc 0
giải phóng khóa đọc 0
giải phóng khóa đọc 0
giành được khóa viết
giải phóng khóa viết 1
giành được khóa đọc
giành được khóa đọc
giành được khóa đọc
giải phóng khóa đọc 1
giải phóng khóa đọc 1
giải phóng khóa đọc 1
giành được khóa viết
giải phóng khóa viết 2
giành được khóa viết
giải phóng khóa viết 3
kết quả cuối cùng 3

TIP

Đối với khóa không nên truyền và lưu trữ dưới dạng giá trị nên sử dụng con trỏ.

Biến điều kiện

Biến điều kiện xuất hiện và sử dụng cùng với khóa loại trừ lẫn nhau vì vậy một số người có thể hiểu nhầm gọi là khóa điều kiện nhưng nó không phải là khóa mà là một cơ chế giao tiếp. sync.Cond trong Go cung cấp triển khai cho điều này và chữ ký hàm tạo biến điều kiện như sau

go
func NewCond(l Locker) *Cond

Có thể thấy tiền đề tạo một biến điều kiện là cần tạo một khóa sync.Cond cung cấp các phương thức sau để sử dụng

go
// Chặn đợi điều kiện có hiệu lực cho đến khi được đánh thức
func (c *Cond) Wait()

// Đánh thức một goroutine bị chặn do điều kiện
func (c *Cond) Signal()

// Đánh thức tất cả goroutine bị chặn do điều kiện
func (c *Cond) Broadcast()

Biến điều kiện sử dụng rất đơn giản chỉ cần sửa đổi một chút ví dụ khóa đọc ghi ở trên là được

go
var wait sync.WaitGroup
var count = 0

var rw sync.RWMutex

// Biến điều kiện
var cond = sync.NewCond(rw.RLocker())

func main() {
  wait.Add(12)
  // Đọc nhiều viết ít
  go func() {
    for i := 0; i < 3; i++ {
      go Write(&count)
    }
    wait.Done()
  }()
  go func() {
    for i := 0; i < 7; i++ {
      go Read(&count)
    }
    wait.Done()
  }()
  // Đợi goroutine con kết thúc
  wait.Wait()
  fmt.Println("kết quả cuối cùng", count)
}

func Read(i *int) {
  time.Sleep(time.Millisecond * time.Duration(rand.Intn(500)))
  rw.RLock()
  fmt.Println("giành được khóa đọc")
  // Điều kiện không thỏa mãn sẽ chặn mãi
  for *i < 3 {
    cond.Wait()
  }
  time.Sleep(time.Millisecond * time.Duration(rand.Intn(1000)))
  fmt.Println("giải phóng khóa đọc", *i)
  rw.RUnlock()
  wait.Done()
}

func Write(i *int) {
  time.Sleep(time.Millisecond * time.Duration(rand.Intn(1000)))
  rw.Lock()
  fmt.Println("giành được khóa viết")
  temp := *i
  time.Sleep(time.Millisecond * time.Duration(rand.Intn(1000)))
  *i = temp + 1
  fmt.Println("giải phóng khóa viết", *i)
  rw.Unlock()
  // Đánh thức tất cả goroutine bị chặn do biến điều kiện
  cond.Broadcast()
  wait.Done()
}

Khi tạo biến điều kiện vì ở đây biến điều kiện tác dụng lên goroutine đọc nên truyền khóa đọc làm khóa loại trừ lẫn nhau nếu truyền trực tiếp khóa đọc ghi loại trừ lẫn nhau sẽ dẫn đến vấn đề goroutine viết mở khóa lại. Ở đây truyền vào là sync.rlocker giành được thông qua phương thức RWMutex.RLocker.

go
func (rw *RWMutex) RLocker() Locker {
   return (*rlocker)(rw)
}

type rlocker RWMutex

func (r *rlocker) Lock()   { (*RWMutex)(r).RLock() }
func (r *rlocker) Unlock() { (*RWMutex)(r).RUnlock() }

Có thể thấy rlocker cũng chỉ đóng gói thao tác khóa đọc của khóa đọc ghi loại trừ lẫn nhau thực tế là cùng một tham chiếu vẫn là cùng một khóa. Khi goroutine đọc đọc dữ liệu nếu nhỏ hơn 3 sẽ chặn mãi cho đến khi dữ liệu lớn hơn 3 còn goroutine viết sau khi cập nhật dữ liệu đều cố gắng đánh thức tất cả goroutine bị chặn do biến điều kiện vì vậy kết quả xuất cuối cùng như sau

giành được khóa đọc
giành được khóa đọc
giành được khóa đọc
giành được khóa đọc
giành được khóa viết
giải phóng khóa viết 1
giành được khóa đọc
giành được khóa viết
giải phóng khóa viết 2
giành được khóa đọc
giành được khóa đọc
giành được khóa viết
giải phóng khóa viết 3 // Goroutine viết thứ ba thực thi xong
giải phóng khóa đọc 3
giải phóng khóa đọc 3
giải phóng khóa đọc 3
giải phóng khóa đọc 3
giải phóng khóa đọc 3
giải phóng khóa đọc 3
giải phóng khóa đọc 3
kết quả cuối cùng 3

Từ kết quả có thể thấy khi goroutine viết thứ ba cập nhật xong dữ liệu bảy goroutine đọc bị chặn do biến điều kiện đều khôi phục chạy.

TIP

Đối với biến điều kiện nên sử dụng for thay vì if nên sử dụng vòng lặp để判断 điều kiện có thỏa mãn hay không vì khi goroutine được đánh thức không thể đảm bảo điều kiện hiện tại đã thỏa mãn.

go
for !condition {
  cond.Wait()
}

sync

Một phần lớn các công cụ liên quan đến đồng thời trong Go được cung cấp bởi thư viện chuẩn sync trên đã giới thiệu qua sync.WaitGroup sync.Locker v.v. ngoài ra gói sync còn có một số công cụ khác có thể sử dụng.

Once

Khi sử dụng một số cấu trúc dữ liệu nếu những cấu trúc dữ liệu này quá cồng kềnh có thể cân nhắc sử dụng cách lười biếng tức là chỉ khởi tạo cấu trúc dữ liệu này khi thực sự cần sử dụng nó. Như ví dụ dưới đây

go
type MySlice []int

func (m *MySlice) Get(i int) (int, bool) {
   if *m == nil {
      return 0, false
   } else {
      return (*m)[i], true
   }
}

func (m *MySlice) Add(i int) {
   // Khi thực sự cần sử dụng slice mới cân nhắc khởi tạo
   if *m == nil {
      *m = make([]int, 0, 10)
   }
   *m = append(*m, i)
}

Vấn đề là nếu chỉ có một goroutine sử dụng chắc chắn không có vấn đề gì nhưng nếu có nhiều goroutine truy cập thì có thể xảy ra vấn đề. Ví dụ goroutine A và B đồng thời gọi phương thức Add A thực thi nhanh hơn một chút đã khởi tạo xong và thành công thêm dữ liệu sau đó goroutine B lại khởi tạo một lần nữa như vậy sẽ ghi đè trực tiếp dữ liệu mà goroutine A thêm đây là vấn đề.

Và đây là vấn đề mà sync.Once cần giải quyết đúng như tên gọi Once dịch là một lần sync.Once đảm bảo dưới điều kiện đồng thời thao tác chỉ định chỉ thực thi một lần. Nó sử dụng rất đơn giản chỉ expose một phương thức Do ra bên ngoài chữ ký như sau

go
func (o *Once) Do(f func())

Khi sử dụng chỉ cần truyền thao tác khởi tạo vào phương thức Do là được như sau

go
var wait sync.WaitGroup

func main() {
  var slice MySlice
  wait.Add(4)
  for i := 0; i < 4; i++ {
    go func() {
      slice.Add(1)
      wait.Done()
    }()
  }
  wait.Wait()
  fmt.Println(slice.Len())
}

type MySlice struct {
  s []int
  o sync.Once
}

func (m *MySlice) Get(i int) (int, bool) {
  if m.s == nil {
    return 0, false
  } else {
    return m.s[i], true
  }
}

func (m *MySlice) Add(i int) {
  // Khi thực sự cần sử dụng slice mới cân nhắc khởi tạo
  m.o.Do(func() {
    fmt.Println("khởi tạo")
    if m.s == nil {
      m.s = make([]int, 0, 10)
    }
  })
  m.s = append(m.s, i)
}

func (m *MySlice) Len() int {
  return len(m.s)
}

Kết quả xuất như sau

khởi tạo
4

Từ kết quả xuất có thể thấy tất cả dữ liệu đều được thêm vào slice bình thường thao tác khởi tạo chỉ thực thi một lần. Thực ra triển khai của sync.Once khá đơn giản loại bỏ chú thích logic mã thật sự chỉ có 16 dòng nguyên lý của nó là khóa + thao tác nguyên tử. Mã nguồn như sau

go
type Once struct {
    // Dùng để判断 thao tác đã thực thi chưa
  done uint32
  m    Mutex
}

func (o *Once) Do(f func()) {
  // Nguyên tử load dữ liệu
  if atomic.LoadUint32(&o.done) == 0 {
    o.doSlow(f)
  }
}

func (o *Once) doSlow(f func()) {
    // Khóa
  o.m.Lock()
    // Mở khóa
  defer o.m.Unlock()
    //判断 có thực thi không
  if o.done == 0 {
        // Sau khi thực thi xong sửa đổi done
    defer atomic.StoreUint32(&o.done, 1)
    f()
  }
}

Pool

Mục đích thiết kế của sync.Pool là dùng để lưu trữ đối tượng tạm thời để tái sử dụng sau này là một pool đối tượng tạm thời an toàn đồng thời đặt đối tượng tạm thời không dùng đến vào pool sau này sử dụng không cần tạo thêm đối tượng có thể tái sử dụng trực tiếp giảm tần suất phân bổ và giải phóng bộ nhớ điểm quan trọng nhất là giảm áp lực GC. sync.Pool tổng cộng chỉ có hai phương thức như sau

go
// Xin một đối tượng
func (p *Pool) Get() any

// Put một đối tượng
func (p *Pool) Put(x any)

sync.Pool có một trường New expose ra bên ngoài dùng để khởi tạo một đối tượng khi pool không xin được đối tượng

go
New func() any

Dưới đây demo bằng một ví dụ

go
var wait sync.WaitGroup

// Pool đối tượng tạm thời
var pool sync.Pool

// Dùng để đếm trong quá trình tổng cộng tạo bao nhiêu đối tượng
var numOfObject atomic.Int64

// BigMemData Giả sử đây là một struct chiếm bộ nhớ lớn
type BigMemData struct {
   M string
}

func main() {
   pool.New = func() any {
      numOfObject.Add(1)
      return BigMemData{"bộ nhớ lớn"}
   }
   wait.Add(1000)
   // Ở đây khởi động 1000 goroutine
   for i := 0; i < 1000; i++ {
      go func() {
         // Xin đối tượng
         val := pool.Get()
         // Sử dụng đối tượng
         _ = val.(BigMemData)
         // Sau khi sử dụng xong giải phóng đối tượng
         pool.Put(val)
         wait.Done()
      }()
   }
   wait.Wait()
   fmt.Println(numOfObject.Load())
}

Ví dụ này khởi động 1000 goroutine liên tục xin và giải phóng đối tượng trong pool nếu không sử dụng pool thì 1000 goroutine đều cần各自 khởi tạo đối tượng và 1000 đối tượng sau khi khởi tạo xong đều cần GC giải phóng bộ nhớ nếu có hàng chục vạn goroutine hoặc chi phí tạo đối tượng này rất cao trong trường hợp này sẽ chiếm rất nhiều bộ nhớ và gây áp lực rất lớn cho GC sau khi sử dụng pool có thể tái sử dụng đối tượng giảm tần suất khởi tạo ví dụ kết quả xuất của ví dụ trên có thể như sau

5

Cho dù khởi động 1000 goroutine trong toàn bộ quá trình chỉ tạo 5 đối tượng nếu không sử dụng pool 1000 goroutine sẽ tạo 1000 đối tượng mức độ tối ưu hóa mang lại là hiển nhiên đặc biệt là khi lượng đồng thời特别 lớn và chi phí khởi tạo đối tượng特别 cao càng có thể thể hiện ưu thế.

Khi sử dụng sync.Pool cần lưu ý một vài điểm

  • Đối tượng tạm thời sync.Pool chỉ phù hợp lưu trữ đối tượng tạm thời đối tượng trong pool có thể bị GC xóa bỏ mà không có bất kỳ thông báo nào vì vậy không nên lưu trữ kết nối mạng kết nối cơ sở dữ liệu loại này vào sync.Pool.
  • Không thể dự đoán sync.Pool khi xin đối tượng không thể dự đoán đối tượng này là mới tạo hay tái sử dụng cũng không thể biết trong pool có mấy đối tượng
  • An toàn đồng thời官方 đảm bảo sync.Pool nhất định an toàn đồng thời nhưng không đảm bảo hàm New dùng để tạo đối tượng nhất định an toàn đồng thời hàm New do người sử dụng truyền vào vì vậy tính an toàn đồng thời của hàm New phải do người sử dụng tự bảo trì đây cũng là lý do tại sao trong ví dụ trên đếm đối tượng sử dụng giá trị nguyên tử.

TIP

Cuối cùng cần lưu ý khi sử dụng xong đối tượng nhất định phải giải phóng lại vào pool nếu dùng mà không giải phóng thì việc sử dụng pool đối tượng sẽ vô nghĩa.

Gói fmt trong thư viện chuẩn có một trường hợp sử dụng pool đối tượng trong hàm fmt.Fprintf

go
func Fprintf(w io.Writer, format string, a ...any) (n int, err error) {
   // Xin một buffer in
   p := newPrinter()
   p.doPrintf(format, a)
   n, err = w.Write(p.buf)
   // Giải phóng sau khi sử dụng xong
   p.free()
   return
}

Trong đó triển khai của hàm newPointer và phương thức free như sau

go
func newPrinter() *pp {
   // Một đối tượng xin từ pool đối tượng
   p := ppFree.Get().(*pp)
   p.panicking = false
   p.erroring = false
   p.wrapErrs = false
   p.fmt.init(&p.buf)
   return p
}

func (p *pp) free() {
    // Để kích thước buffer trong pool đối tượng đại khái giống nhau để kiểm soát buffer đàn hồi tốt hơn
    // Buffer quá lớn就不用放回对象池
  if cap(p.buf) > 64<<10 {
    return
  }
  // Reset trường rồi giải phóng đối tượng vào pool
  p.buf = p.buf[:0]
  p.arg = nil
  p.value = reflect.Value{}
  p.wrappedErr = nil
  ppFree.Put(p)
}

Map

sync.Map là một triển khai Map an toàn đồng thời do官方 cung cấp mở hộp là dùng sử dụng rất đơn giản dưới đây là các phương thức mà struct này expose ra bên ngoài

go
// Đọc giá trị theo một key giá trị trả về sẽ trả về giá trị tương ứng và giá trị đó có tồn tại hay không
func (m *Map) Load(key any) (value any, ok bool)

// Lưu trữ một cặp key-value
func (m *Map) Store(key, value any)

// Xóa một cặp key-value
func (m *Map) Delete(key any)

// Nếu key đã tồn tại thì trả về giá trị cũ nếu không thì lưu giá trị mới vào và trả về khi thành công đọc được giá trị loaded là true nếu không là false
func (m *Map) LoadOrStore(key, value any) (actual any, loaded bool)

// Xóa một cặp key-value và trả về giá trị cũ loaded phụ thuộc vào key có tồn tại hay không
func (m *Map) LoadAndDelete(key any) (value any, loaded bool)

//遍历 Map khi f() trả về false sẽ dừng遍历
func (m *Map) Range(f func(key, value any) bool)

Dưới đây dùng một ví dụ đơn giản để demo cách sử dụng cơ bản của sync.Map

go
func main() {
  var syncMap sync.Map
  // Lưu dữ liệu
  syncMap.Store("a", 1)
  syncMap.Store("a", "a")
  // Đọc dữ liệu
  fmt.Println(syncMap.Load("a"))
  // Đọc và xóa
  fmt.Println(syncMap.LoadAndDelete("a"))
  // Đọc hoặc lưu
  fmt.Println(syncMap.LoadOrStore("a", "hello world"))
  syncMap.Store("b", "goodbye world")
  //遍历 map
  syncMap.Range(func(key, value any) bool {
    fmt.Println(key, value)
    return true
  })
}

Kết quả xuất

a true
a true
hello world false
a hello world
b goodbye world

Tiếp theo xem một ví dụ sử dụng map đồng thời

go
func main() {
  myMap := make(map[int]int, 10)
  var wait sync.WaitGroup
  wait.Add(10)
  for i := 0; i < 10; i++ {
    go func(n int) {
      for i := 0; i < 100; i++ {
        myMap[n] = n
      }
      wait.Done()
    }(i)
  }
  wait.Wait()
}

Ví dụ trên sử dụng map thông thường mở 10 goroutine liên tục lưu dữ liệu rõ ràng điều này rất có thể kích hoạt fatal kết quả rất có thể như sau

fatal error: concurrent map writes

Sử dụng sync.Map có thể tránh vấn đề này

go
func main() {
  var syncMap sync.Map
  var wait sync.WaitGroup
  wait.Add(10)
  for i := 0; i < 10; i++ {
    go func(n int) {
      for i := 0; i < 100; i++ {
        syncMap.Store(n, n)
      }
      wait.Done()
    }(i)
  }
  wait.Wait()
  syncMap.Range(func(key, value any) bool {
    fmt.Println(key, value)
    return true
  })
}

Kết quả xuất như sau

8 8
3 3
1 1
9 9
6 6
5 5
7 7
0 0
2 2
4 4

Để an toàn đồng thời chắc chắn cần hy sinh nhất định hiệu suất của sync.Map thấp hơn map khoảng 10-100 lần.

Nguyên tử

Trong khoa học máy tính nguyên tử hoặc thao tác nguyên thủy thường dùng để mô tả một số thao tác không thể chia nhỏ hơn vì những thao tác này không thể chia nhỏ thành các bước nhỏ hơn trước khi thực thi xong sẽ không bị bất kỳ goroutine nào khác gián đoạn vì vậy kết quả thực thi hoặc thành công hoặc thất bại không có tình huống thứ ba nếu xuất hiện tình huống khác thì nó không phải là thao tác nguyên tử. Ví dụ mã dưới đây

go
func main() {
  a := 0
  if a == 0 {
    a = 1
  }
  fmt.Println(a)
}

Mã trên là một nhánh判断 đơn giản mặc dù mã rất ít nhưng cũng không phải là thao tác nguyên tử thao tác nguyên tử thật sự do cấp độ chỉ dẫn phần cứng hỗ trợ.

Loại

May mắn thay trong hầu hết trường hợp không cần tự viết assembly gói sync/atomic trong thư viện chuẩn Go đã cung cấp API liên quan đến thao tác nguyên tử nó cung cấp các loại sau để thực hiện thao tác nguyên tử.

go
atomic.Bool{}
atomic.Pointer[]{}
atomic.Int32{}
atomic.Int64{}
atomic.Uint32{}
atomic.Uint64{}
atomic.Uintptr{}
atomic.Value{}

Trong đó loại nguyên tử Pointer hỗ trợ generic loại Value hỗ trợ lưu trữ bất kỳ loại nào ngoài ra còn cung cấp nhiều hàm để thao tác thuận tiện. Vì độ hạt của thao tác nguyên tử quá mịn trong hầu hết trường hợp phù hợp hơn để xử lý những dữ liệu cơ bản này.

TIP

Thao tác nguyên tử trong gói atmoic chỉ có chữ ký hàm không có triển khai cụ thể triển khai cụ thể do plan9 assembly viết.

Sử dụng

Mỗi loại nguyên tử đều cung cấp ba phương thức sau

  • Load(): nguyên tử lấy giá trị
  • Swap(newVal type) (old type): nguyên tử hoán đổi giá trị và trả về giá trị cũ
  • Store(val type): nguyên tử lưu giá trị

Các loại khác nhau có thể có thêm phương thức bổ sung khác ví dụ loại số nguyên đều cung cấp phương thức Add để thực hiện thao tác cộng trừ nguyên tử. Dưới đây lấy loại int64 làm ví dụ demo

go
func main() {
  var aint64 atomic.Uint64
  // Lưu giá trị
  aint64.Store(64)
  // Hoán đổi giá trị
  aint64.Swap(128)
  // Tăng
  aint64.Add(112)
    // Load giá trị
  fmt.Println(aint64.Load())
}

Hoặc cũng có thể sử dụng hàm trực tiếp

go
func main() {
   var aint64 int64
   // Lưu giá trị
   atomic.StoreInt64(&aint64, 64)
   // Hoán đổi giá trị
   atomic.SwapInt64(&aint64, 128)
   // Tăng
   atomic.AddInt64(&aint64, 112)
   // Load
   fmt.Println(atomic.LoadInt64(&aint64))
}

Cách sử dụng của các loại khác cũng rất tương tự cuối cùng kết quả xuất là

240

CAS

Gói atomic còn cung cấp thao tác CompareAndSwap tức là CAS. Nó là cốt lõi để thực hiện khóa lạc quan và cấu trúc dữ liệu không khóa. Bản thân khóa lạc quan không phải là khóa mà là một cách kiểm soát đồng thời không khóa dưới điều kiện đồng thời thread/goroutine trước khi sửa đổi dữ liệu sẽ không khóa trước mà đọc dữ liệu trước thực hiện tính toán rồi khi提交 sửa đổi sử dụng CAS để判断 trong thời gian này có thread khác sửa đổi dữ liệu này hay không. Nếu không (giá trị vẫn bằng giá trị đọc trước đó) thì sửa đổi thành công nếu không thì thất bại và thử lại. Vì vậy lý do được gọi là khóa lạc quan là vì nó luôn lạc quan giả định dữ liệu chia sẻ không bị sửa đổi chỉ khi phát hiện dữ liệu không bị sửa đổi mới thực hiện thao tác tương ứng còn khóa loại trừ lẫn nhau đã tìm hiểu trước đó là khóa bi quan khóa loại trừ lẫn nhau luôn bi quan cho rằng dữ liệu chia sẻ chắc chắn bị sửa đổi vì vậy khi thao tác sẽ khóa sau khi thao tác xong sẽ mở khóa. Do đồng thời không khóa được thực hiện tính an toàn và hiệu quả so với khóa cao hơn nhiều cấu trúc dữ liệu an toàn đồng thời đều sử dụng CAS để thực hiện tuy nhiên hiệu quả thật sự phải kết hợp với tình huống sử dụng cụ thể mà xem. Xem một ví dụ dưới đây

go
var lock sync.Mutex

var count int

func Add(num int) {
   lock.Lock()
   count += num
   lock.Unlock()
}

Đây là một ví dụ sử dụng khóa loại trừ lẫn nhau mỗi lần tăng số đều khóa trước sau khi thực thi xong sẽ mở khóa trong quá trình sẽ dẫn đến goroutine khác chặn tiếp theo sử dụng CAS cải tạo một chút

go
var count int64

func Add(num int64) {
  for {
    expect := atomic.LoadInt64(&count)
    if atomic.CompareAndSwapInt64(&count, expect, expect+num) {
      break
    }
  }
}

Đối với CAS có ba tham số giá trị bộ nhớ giá trị kỳ vọng giá trị mới. Khi thực thi CAS sẽ so sánh giá trị kỳ vọng với giá trị bộ nhớ hiện tại nếu giá trị bộ nhớ giống với giá trị kỳ vọng sẽ thực hiện thao tác tiếp theo nếu không thì không làm gì cả. Đối với thao tác nguyên tử trong gói atomic của Go hàm liên quan đến CAS cần truyền địa chỉ giá trị kỳ vọng giá trị mới và sẽ trả về giá trị bool có thay thế thành công hay không. Ví dụ chữ ký hàm thao tác CAS của loại int64 như sau

go
func CompareAndSwapInt64(addr *int64, old, new int64) (swapped bool)

Trong ví dụ CAS trước tiên sẽ thông qua LoadInt64 để lấy giá trị kỳ vọng sau đó sử dụng CompareAndSwapInt64 để so sánh hoán đổi nếu không thành công thì liên tục vòng lặp cho đến khi thành công. Thao tác không khóa như vậy tuy không dẫn đến goroutine chặn nhưng liên tục vòng lặp đối với CPU vẫn là một chi phí không nhỏ vì vậy trong một số triển khai khi thất bại đạt đến một số lần có thể từ bỏ thao tác. Nhưng đối với thao tác trên chỉ đơn giản là cộng số liên quan đến thao tác không phức tạp vì vậy hoàn toàn có thể cân nhắc thực hiện không khóa.

TIP

Trong hầu hết trường hợp chỉ so sánh giá trị không thể đảm bảo an toàn đồng thời ví dụ vấn đề ABA do CAS gây ra cần sử dụng thêm version để giải quyết vấn đề.

Value

Struct atomic.Value có thể lưu trữ giá trị bất kỳ loại struct như sau

go
type Value struct {
   // Loại any
   v any
}

Mặc dù có thể lưu trữ bất kỳ loại nhưng nó không thể lưu trữ nil và giá trị lưu trữ trước sau nên nhất quán hai ví dụ dưới đây đều không thể biên dịch

go
func main() {
   var val atomic.Value
   val.Store(nil)
   fmt.Println(val.Load())
}
// panic: sync/atomic: store of nil value into Value
go
func main() {
   var val atomic.Value
   val.Store("hello world")
   val.Store(114514)
   fmt.Println(val.Load())
}
// panic: sync/atomic: store of inconsistently typed value into Value

Ngoài ra cách sử dụng của nó không khác biệt lớn so với các loại nguyên tử khác và cần lưu ý tất cả các loại nguyên tử không nên sao chép giá trị mà nên sử dụng con trỏ của chúng.

Golang by www.golangdev.cn edit