ワーカープールで並行処理を効率化する

大量のタスクを処理するとき、タスクごとに goroutine を起動すると数が膨大になることがあります。ワーカープールは、決まった数の goroutine(ワーカー)でタスクを順次処理するパターンです。

ワーカープールの構造

基本的な構造はこうなります。

タスクを channel に投入

複数のワーカーが channel からタスクを受け取る

ワーカーがタスクを処理

結果を別の channel に送信

ワーカーの数を固定することで、リソースの使用量を制御できます。

基本的な実装

3 つのワーカーでジョブを処理する例です。

package main

import (
    "fmt"
    "time"
)

func worker(id int, jobs <-chan int, results chan<- int) {
    for job := range jobs {
        fmt.Printf("Worker %d: job %d 開始\n", id, job)
        time.Sleep(100 * time.Millisecond) // 処理に時間がかかる想定
        results <- job * 2
        fmt.Printf("Worker %d: job %d 完了\n", id, job)
    }
}

func main() {
    jobs := make(chan int, 10)
    results := make(chan int, 10)

    // 3 つのワーカーを起動
    for w := 1; w <= 3; w++ {
        go worker(w, jobs, results)
    }

    // 5 つのジョブを投入
    for j := 1; j <= 5; j++ {
        jobs <- j
    }
    close(jobs)

    // 結果を受け取る
    for r := 1; r <= 5; r++ {
        fmt.Println("結果:", <-results)
    }
}

jobs <-chan int は受信専用、results chan<- int は送信専用の channel を意味します。こうすることで、ワーカー内で誤って jobs に送信したり、results から受信したりするのを防げます。

バッファ付き channel

この例ではバッファ付き channel を使っています。

jobs := make(chan int, 10)

バッファがあると、受信側がいなくてもバッファの容量まで送信できます。ジョブの投入と処理を非同期にできるため、効率が良くなります。

WaitGroup と組み合わせる

結果の数が決まっていない場合や、すべての処理完了を待ちたい場合は WaitGroup を使います。

package main

import (
    "fmt"
    "sync"
    "time"
)

func worker(id int, jobs <-chan int, wg *sync.WaitGroup) {
    defer wg.Done()
    for job := range jobs {
        fmt.Printf("Worker %d: job %d\n", id, job)
        time.Sleep(100 * time.Millisecond)
    }
}

func main() {
    jobs := make(chan int, 10)
    var wg sync.WaitGroup

    for w := 1; w <= 3; w++ {
        wg.Add(1)
        go worker(w, jobs, &wg)
    }

    for j := 1; j <= 10; j++ {
        jobs <- j
    }
    close(jobs)

    wg.Wait()
    fmt.Println("すべて完了")
}

jobs を close すると、ワーカーの for range ループが終了し、Done が呼ばれます。

ワーカー数の決め方

ワーカー数は処理の性質によって変わります。

CPU バウンドな処理

CPU コア数程度が目安。runtime.NumCPU() で取得できる。

I/O バウンドな処理

ネットワークやディスク待ちが多いなら、コア数より多くしても効果がある。

実際にはベンチマークを取って調整するのが確実です。

実用的な場面

ワーカープールは次のような場面で役立ちます。

HTTP リクエストを並行で送るとき、同時接続数を制限できます。ファイルの一括処理でも、ディスク I/O の負荷を抑えられます。データベースへのクエリも、コネクションプールと組み合わせて使うことが多いです。

goroutine は軽量ですが、無制限に作るとメモリを圧迫したり、外部リソースに過負荷をかけたりします。ワーカープールで適切に制御することが大切です。