ワーカープールで並行処理を効率化する
大量のタスクを処理するとき、タスクごとに goroutine を起動すると数が膨大になることがあります。ワーカープールは、決まった数の goroutine(ワーカー)でタスクを順次処理するパターンです。
ワーカープールの構造
基本的な構造はこうなります。
タスクを channel に投入
複数のワーカーが channel からタスクを受け取る
ワーカーがタスクを処理
結果を別の channel に送信
ワーカーの数を固定することで、リソースの使用量を制御できます。
基本的な実装
3 つのワーカーでジョブを処理する例です。
package main
import (
"fmt"
"time"
)
func worker(id int, jobs <-chan int, results chan<- int) {
for job := range jobs {
fmt.Printf("Worker %d: job %d 開始\n", id, job)
time.Sleep(100 * time.Millisecond) // 処理に時間がかかる想定
results <- job * 2
fmt.Printf("Worker %d: job %d 完了\n", id, job)
}
}
func main() {
jobs := make(chan int, 10)
results := make(chan int, 10)
// 3 つのワーカーを起動
for w := 1; w <= 3; w++ {
go worker(w, jobs, results)
}
// 5 つのジョブを投入
for j := 1; j <= 5; j++ {
jobs <- j
}
close(jobs)
// 結果を受け取る
for r := 1; r <= 5; r++ {
fmt.Println("結果:", <-results)
}
}jobs <-chan int は受信専用、results chan<- int は送信専用の channel を意味します。こうすることで、ワーカー内で誤って jobs に送信したり、results から受信したりするのを防げます。
バッファ付き channel
この例ではバッファ付き channel を使っています。
jobs := make(chan int, 10)バッファがあると、受信側がいなくてもバッファの容量まで送信できます。ジョブの投入と処理を非同期にできるため、効率が良くなります。
WaitGroup と組み合わせる
結果の数が決まっていない場合や、すべての処理完了を待ちたい場合は WaitGroup を使います。
package main
import (
"fmt"
"sync"
"time"
)
func worker(id int, jobs <-chan int, wg *sync.WaitGroup) {
defer wg.Done()
for job := range jobs {
fmt.Printf("Worker %d: job %d\n", id, job)
time.Sleep(100 * time.Millisecond)
}
}
func main() {
jobs := make(chan int, 10)
var wg sync.WaitGroup
for w := 1; w <= 3; w++ {
wg.Add(1)
go worker(w, jobs, &wg)
}
for j := 1; j <= 10; j++ {
jobs <- j
}
close(jobs)
wg.Wait()
fmt.Println("すべて完了")
}jobs を close すると、ワーカーの for range ループが終了し、Done が呼ばれます。
ワーカー数の決め方
ワーカー数は処理の性質によって変わります。
CPU コア数程度が目安。runtime.NumCPU() で取得できる。
ネットワークやディスク待ちが多いなら、コア数より多くしても効果がある。
実際にはベンチマークを取って調整するのが確実です。
実用的な場面
ワーカープールは次のような場面で役立ちます。
HTTP リクエストを並行で送るとき、同時接続数を制限できます。ファイルの一括処理でも、ディスク I/O の負荷を抑えられます。データベースへのクエリも、コネクションプールと組み合わせて使うことが多いです。
goroutine は軽量ですが、無制限に作るとメモリを圧迫したり、外部リソースに過負荷をかけたりします。ワーカープールで適切に制御することが大切です。