Go の channel パターン:パイプライン処理

パイプライン処理は、データを複数のステージに分けて流し、各ステージを goroutine で並行に実行するパターンです。Unix のパイプ(cat file | grep pattern | sort)と同じ考え方を Go の channel で実現します。

パイプラインの基本構造

パイプラインは「入力 → 処理A → 処理B → 処理C → 出力」のように、データが各ステージを順番に流れていく構造です。

package main

import "fmt"

// ステージ 1: 数値を生成
func generate(nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        for _, n := range nums {
            out <- n
        }
        close(out)
    }()
    return out
}

// ステージ 2: 2 倍にする
func double(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        for n := range in {
            out <- n * 2
        }
        close(out)
    }()
    return out
}

// ステージ 3: 10 を足す
func addTen(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        for n := range in {
            out <- n + 10
        }
        close(out)
    }()
    return out
}

func main() {
    // パイプラインを構築
    ch := generate(1, 2, 3, 4, 5)
    ch = double(ch)
    ch = addTen(ch)

    // 結果を出力
    for v := range ch {
        fmt.Println(v) // 12, 14, 16, 18, 20
    }
}

各ステージは独立した goroutine で動作し、channel を通じてデータを受け渡します。

パイプラインの利点

パイプライン処理には複数の利点があります。

並行実行

各ステージが同時に動作するため、1 つ目のデータがステージ 3 にいる間に、2 つ目のデータがステージ 2 で処理されます。

メモリ効率

すべてのデータを一度にメモリに載せる必要がありません。ストリーミング処理が可能です。

大量のデータを扱う場合、パイプラインはメモリ効率と処理速度の両面で優れています。

実践例:ログ処理パイプライン

ログファイルを読み込み、フィルタリングして集計するパイプラインを考えてみましょう。

type LogEntry struct {
    Timestamp string
    Level     string
    Message   string
}

// ステージ 1: ログを読み込む
func readLogs(lines []string) <-chan LogEntry {
    out := make(chan LogEntry)
    go func() {
        for _, line := range lines {
            parts := strings.SplitN(line, " ", 3)
            if len(parts) == 3 {
                out <- LogEntry{
                    Timestamp: parts[0],
                    Level:     parts[1],
                    Message:   parts[2],
                }
            }
        }
        close(out)
    }()
    return out
}

// ステージ 2: ERROR のみ抽出
func filterErrors(in <-chan LogEntry) <-chan LogEntry {
    out := make(chan LogEntry)
    go func() {
        for entry := range in {
            if entry.Level == "ERROR" {
                out <- entry
            }
        }
        close(out)
    }()
    return out
}

// ステージ 3: フォーマット
func format(in <-chan LogEntry) <-chan string {
    out := make(chan string)
    go func() {
        for entry := range in {
            out <- fmt.Sprintf("[%s] %s", entry.Timestamp, entry.Message)
        }
        close(out)
    }()
    return out
}

このパイプラインは、ログの読み込み、フィルタリング、フォーマットを並行に実行します。

キャンセル対応

長時間動作するパイプラインでは、途中でキャンセルできることが重要です。

func generateWithCancel(ctx context.Context, nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for _, n := range nums {
            select {
            case out <- n:
            case <-ctx.Done():
                return
            }
        }
    }()
    return out
}

func doubleWithCancel(ctx context.Context, in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for n := range in {
            select {
            case out <- n * 2:
            case <-ctx.Done():
                return
            }
        }
    }()
    return out
}

context を使うことで、パイプライン全体を安全にキャンセルできます。

使用例

func main() {
    ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
    defer cancel()

    ch := generateWithCancel(ctx, 1, 2, 3, 4, 5)
    ch = doubleWithCancel(ctx, ch)

    for v := range ch {
        fmt.Println(v)
        time.Sleep(500 * time.Millisecond)
    }
}

タイムアウトが発生すると、すべてのステージが停止します。

パイプラインの分岐

1 つの入力を複数のパイプラインに分岐させることもできます。

func split(in <-chan int) (<-chan int, <-chan int) {
    out1 := make(chan int)
    out2 := make(chan int)
    go func() {
        defer close(out1)
        defer close(out2)
        for n := range in {
            out1 <- n
            out2 <- n
        }
    }()
    return out1, out2
}

この関数は入力を 2 つの channel に複製します。異なる処理を並列に適用したい場合に便利です。

パイプラインの設計指針

効果的なパイプラインを設計するためのポイントをまとめます。

各ステージは単一の責務を持つ

入力は receive-only channel

出力は send-only channel(関数内で作成)

channel は必ず close する

この規約に従うことで、ステージを自由に組み合わせられる柔軟なパイプラインが構築できます。ステージの追加や入れ替えも容易になります。