Go の channel パターン:fan-out と fan-in

並行処理のパターンとして「fan-out / fan-in」があります。fan-out は 1 つの入力を複数の goroutine に分散させ、fan-in は複数の出力を 1 つに集約します。処理を並列化してスループットを向上させる基本的なテクニックです。

fan-out とは

fan-out は、1 つの channel からデータを読み取り、複数の goroutine で並列に処理することです。

package main

import (
    "fmt"
    "sync"
    "time"
)

func worker(id int, jobs <-chan int, results chan<- int) {
    for job := range jobs {
        fmt.Printf("Worker %d: 処理中 %d\n", id, job)
        time.Sleep(100 * time.Millisecond) // 重い処理をシミュレート
        results <- job * 2
    }
}

func main() {
    jobs := make(chan int, 10)
    results := make(chan int, 10)

    // 3 つのワーカーを起動(fan-out)
    for i := 1; i <= 3; i++ {
        go worker(i, jobs, results)
    }

    // ジョブを投入
    for i := 1; i <= 9; i++ {
        jobs <- i
    }
    close(jobs)

    // 結果を収集
    for i := 1; i <= 9; i++ {
        fmt.Println("結果:", <-results)
    }
}

9 個のジョブを 3 つのワーカーで分散処理しています。1 つのワーカーで順番に処理するより約 3 倍速くなります。

fan-in とは

fan-in は、複数の channel からの出力を 1 つの channel に集約することです。

func fanIn(channels ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    merged := make(chan int)

    output := func(ch <-chan int) {
        defer wg.Done()
        for v := range ch {
            merged <- v
        }
    }

    wg.Add(len(channels))
    for _, ch := range channels {
        go output(ch)
    }

    go func() {
        wg.Wait()
        close(merged)
    }()

    return merged
}

この関数は任意の数の channel を受け取り、すべての値を 1 つの channel にまとめます。

fan-in の使用例

複数のデータソースからの結果を集約する場面を見てみましょう。

func generator(start, count int) <-chan int {
    ch := make(chan int)
    go func() {
        for i := 0; i < count; i++ {
            ch <- start + i
            time.Sleep(50 * time.Millisecond)
        }
        close(ch)
    }()
    return ch
}

func main() {
    ch1 := generator(1, 5)   // 1, 2, 3, 4, 5
    ch2 := generator(100, 5) // 100, 101, 102, 103, 104
    ch3 := generator(200, 5) // 200, 201, 202, 203, 204

    merged := fanIn(ch1, ch2, ch3)

    for v := range merged {
        fmt.Println(v)
    }
}

3 つの generator が並行して値を生成し、fanIn がそれらを 1 つの channel にまとめます。出力順は不定ですが、すべての値が収集されます。

実践例:Web スクレイピング

複数の URL を並列にフェッチし、結果を集約する例です。

type FetchResult struct {
    URL    string
    Status int
    Err    error
}

func fetch(url string) <-chan FetchResult {
    ch := make(chan FetchResult, 1)
    go func() {
        resp, err := http.Get(url)
        if err != nil {
            ch <- FetchResult{URL: url, Err: err}
        } else {
            ch <- FetchResult{URL: url, Status: resp.StatusCode}
            resp.Body.Close()
        }
        close(ch)
    }()
    return ch
}

func main() {
    urls := []string{
        "https://example.com",
        "https://example.org",
        "https://example.net",
    }

    // 各 URL のフェッチを開始(fan-out)
    var channels []<-chan FetchResult
    for _, url := range urls {
        channels = append(channels, fetch(url))
    }

    // 結果を集約(fan-in)
    for _, ch := range channels {
        result := <-ch
        if result.Err != nil {
            fmt.Printf("%s: エラー %v\n", result.URL, result.Err)
        } else {
            fmt.Printf("%s: %d\n", result.URL, result.Status)
        }
    }
}

3 つの URL を同時にフェッチするため、順番にフェッチするより大幅に速くなります。

処理の流れ

fan-out / fan-in の典型的な流れを図示すると以下のようになります。

入力データ

fan-out で複数ワーカーに分散

各ワーカーで並列処理

fan-in で結果を集約

この構造は MapReduce パターンの基本形でもあります。大量のデータを効率的に処理する際の定番アプローチです。

注意点

fan-out / fan-in を使う際の注意点をまとめます。

ワーカー数の適切な設定

多すぎると context switch のオーバーヘッドが増え、少なすぎると並列化の効果が薄れます。CPU バウンドな処理なら CPU コア数程度、I/O バウンドならより多くても効果的です。

channel のバッファサイズ

バッファなしだとワーカー間で待ち合いが発生しやすくなります。適切なバッファサイズを設定してスループットを向上させましょう。

workload の特性に応じて、ワーカー数やバッファサイズを調整することが重要です。