Go のバッファ付き channel で非同期処理を行う
channel を作成する際、バッファサイズを指定することで「バッファ付き channel」を作れます。バッファなしの channel は送信と受信が同時に行われる必要がありますが、バッファ付きではその制約が緩和されます。
バッファなし vs バッファ付き
まずは違いを確認しましょう。
送信側は受信側が受け取るまでブロックする。同期的な通信。
バッファが空いていれば送信側はブロックしない。非同期的な通信が可能。
バッファなし channel は make(chan int) で作成し、バッファ付きは make(chan int, 3) のようにサイズを指定します。
バッファ付き channel の基本
package main
import "fmt"
func main() {
ch := make(chan int, 3) // バッファサイズ 3
ch <- 1
ch <- 2
ch <- 3
// ch <- 4 // これはブロックする(バッファが満杯)
fmt.Println(<-ch) // 1
fmt.Println(<-ch) // 2
fmt.Println(<-ch) // 3
}バッファサイズ 3 の channel には、受信側がいなくても 3 つまで値を送信できます。4 つ目を送ろうとすると、バッファに空きができるまでブロックします。
ブロックの挙動を確認する
バッファが満杯になったときの挙動を見てみましょう。
func main() {
ch := make(chan int, 2)
ch <- 1
fmt.Println("1 を送信")
ch <- 2
fmt.Println("2 を送信")
go func() {
time.Sleep(2 * time.Second)
fmt.Println("受信:", <-ch)
}()
ch <- 3 // バッファが満杯なのでブロック
fmt.Println("3 を送信")
}3 を送信しようとした時点でバッファは満杯です。goroutine が 2 秒後に 1 つ受信するまで、送信はブロックされます。
len と cap で状態を確認
len() でバッファ内の要素数、cap() でバッファの容量を確認できます。
func main() {
ch := make(chan string, 5)
ch <- "a"
ch <- "b"
ch <- "c"
fmt.Println("要素数:", len(ch)) // 3
fmt.Println("容量:", cap(ch)) // 5
}この情報を使って、channel の混雑状況を監視することもできます。
実践例:ジョブキュー
バッファ付き channel はジョブキューとして使えます。
func main() {
jobs := make(chan int, 10) // 最大 10 件のジョブを保持
// プロデューサー:ジョブを投入
go func() {
for i := 1; i <= 5; i++ {
jobs <- i
fmt.Println("ジョブ投入:", i)
}
close(jobs)
}()
// コンシューマー:ジョブを処理
for job := range jobs {
fmt.Println("ジョブ処理:", job)
time.Sleep(500 * time.Millisecond)
}
}プロデューサーはバッファに空きがある限りジョブを投入し続けられます。コンシューマーの処理速度に関係なく、一定量のジョブを先に投入できるのがメリットです。
セマフォとしての利用
バッファ付き channel は同時実行数を制限するセマフォとしても使えます。
func main() {
sem := make(chan struct{}, 3) // 同時に 3 つまで
var wg sync.WaitGroup
for i := 1; i <= 10; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
sem <- struct{}{} // セマフォ取得
defer func() { <-sem }() // セマフォ解放
fmt.Printf("タスク %d 開始\n", id)
time.Sleep(1 * time.Second)
fmt.Printf("タスク %d 完了\n", id)
}(i)
}
wg.Wait()
}バッファサイズが 3 なので、同時に実行されるタスクは最大 3 つに制限されます。API のレート制限やリソースの保護に便利です。
バッファサイズの選び方
バッファサイズは大きすぎても小さすぎても問題があります。
送信側が頻繁にブロックし、並行処理の効率が下がる。
メモリを無駄に消費する。また、問題の発見が遅れることがある。
一般的には、プロデューサーとコンシューマーの速度差を吸収できる程度のサイズを設定します。実測して調整するのがベストです。