Go の channel パターン:fan-out と fan-in
並行処理のパターンとして「fan-out / fan-in」があります。fan-out は 1 つの入力を複数の goroutine に分散させ、fan-in は複数の出力を 1 つに集約します。処理を並列化してスループットを向上させる基本的なテクニックです。
fan-out とは
fan-out は、1 つの channel からデータを読み取り、複数の goroutine で並列に処理することです。
package main
import (
"fmt"
"sync"
"time"
)
func worker(id int, jobs <-chan int, results chan<- int) {
for job := range jobs {
fmt.Printf("Worker %d: 処理中 %d\n", id, job)
time.Sleep(100 * time.Millisecond) // 重い処理をシミュレート
results <- job * 2
}
}
func main() {
jobs := make(chan int, 10)
results := make(chan int, 10)
// 3 つのワーカーを起動(fan-out)
for i := 1; i <= 3; i++ {
go worker(i, jobs, results)
}
// ジョブを投入
for i := 1; i <= 9; i++ {
jobs <- i
}
close(jobs)
// 結果を収集
for i := 1; i <= 9; i++ {
fmt.Println("結果:", <-results)
}
}9 個のジョブを 3 つのワーカーで分散処理しています。1 つのワーカーで順番に処理するより約 3 倍速くなります。
fan-in とは
fan-in は、複数の channel からの出力を 1 つの channel に集約することです。
func fanIn(channels ...<-chan int) <-chan int {
var wg sync.WaitGroup
merged := make(chan int)
output := func(ch <-chan int) {
defer wg.Done()
for v := range ch {
merged <- v
}
}
wg.Add(len(channels))
for _, ch := range channels {
go output(ch)
}
go func() {
wg.Wait()
close(merged)
}()
return merged
}この関数は任意の数の channel を受け取り、すべての値を 1 つの channel にまとめます。
fan-in の使用例
複数のデータソースからの結果を集約する場面を見てみましょう。
func generator(start, count int) <-chan int {
ch := make(chan int)
go func() {
for i := 0; i < count; i++ {
ch <- start + i
time.Sleep(50 * time.Millisecond)
}
close(ch)
}()
return ch
}
func main() {
ch1 := generator(1, 5) // 1, 2, 3, 4, 5
ch2 := generator(100, 5) // 100, 101, 102, 103, 104
ch3 := generator(200, 5) // 200, 201, 202, 203, 204
merged := fanIn(ch1, ch2, ch3)
for v := range merged {
fmt.Println(v)
}
}3 つの generator が並行して値を生成し、fanIn がそれらを 1 つの channel にまとめます。出力順は不定ですが、すべての値が収集されます。
実践例:Web スクレイピング
複数の URL を並列にフェッチし、結果を集約する例です。
type FetchResult struct {
URL string
Status int
Err error
}
func fetch(url string) <-chan FetchResult {
ch := make(chan FetchResult, 1)
go func() {
resp, err := http.Get(url)
if err != nil {
ch <- FetchResult{URL: url, Err: err}
} else {
ch <- FetchResult{URL: url, Status: resp.StatusCode}
resp.Body.Close()
}
close(ch)
}()
return ch
}
func main() {
urls := []string{
"https://example.com",
"https://example.org",
"https://example.net",
}
// 各 URL のフェッチを開始(fan-out)
var channels []<-chan FetchResult
for _, url := range urls {
channels = append(channels, fetch(url))
}
// 結果を集約(fan-in)
for _, ch := range channels {
result := <-ch
if result.Err != nil {
fmt.Printf("%s: エラー %v\n", result.URL, result.Err)
} else {
fmt.Printf("%s: %d\n", result.URL, result.Status)
}
}
}3 つの URL を同時にフェッチするため、順番にフェッチするより大幅に速くなります。
処理の流れ
fan-out / fan-in の典型的な流れを図示すると以下のようになります。
入力データ
fan-out で複数ワーカーに分散
各ワーカーで並列処理
fan-in で結果を集約
この構造は MapReduce パターンの基本形でもあります。大量のデータを効率的に処理する際の定番アプローチです。
注意点
fan-out / fan-in を使う際の注意点をまとめます。
多すぎると context switch のオーバーヘッドが増え、少なすぎると並列化の効果が薄れます。CPU バウンドな処理なら CPU コア数程度、I/O バウンドならより多くても効果的です。
バッファなしだとワーカー間で待ち合いが発生しやすくなります。適切なバッファサイズを設定してスループットを向上させましょう。
workload の特性に応じて、ワーカー数やバッファサイズを調整することが重要です。