Go の channel パターン:パイプライン処理
パイプライン処理は、データを複数のステージに分けて流し、各ステージを goroutine で並行に実行するパターンです。Unix のパイプ(cat file | grep pattern | sort)と同じ考え方を Go の channel で実現します。
パイプラインの基本構造
パイプラインは「入力 → 処理A → 処理B → 処理C → 出力」のように、データが各ステージを順番に流れていく構造です。
package main
import "fmt"
// ステージ 1: 数値を生成
func generate(nums ...int) <-chan int {
out := make(chan int)
go func() {
for _, n := range nums {
out <- n
}
close(out)
}()
return out
}
// ステージ 2: 2 倍にする
func double(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n * 2
}
close(out)
}()
return out
}
// ステージ 3: 10 を足す
func addTen(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n + 10
}
close(out)
}()
return out
}
func main() {
// パイプラインを構築
ch := generate(1, 2, 3, 4, 5)
ch = double(ch)
ch = addTen(ch)
// 結果を出力
for v := range ch {
fmt.Println(v) // 12, 14, 16, 18, 20
}
}各ステージは独立した goroutine で動作し、channel を通じてデータを受け渡します。
パイプラインの利点
パイプライン処理には複数の利点があります。
各ステージが同時に動作するため、1 つ目のデータがステージ 3 にいる間に、2 つ目のデータがステージ 2 で処理されます。
すべてのデータを一度にメモリに載せる必要がありません。ストリーミング処理が可能です。
大量のデータを扱う場合、パイプラインはメモリ効率と処理速度の両面で優れています。
実践例:ログ処理パイプライン
ログファイルを読み込み、フィルタリングして集計するパイプラインを考えてみましょう。
type LogEntry struct {
Timestamp string
Level string
Message string
}
// ステージ 1: ログを読み込む
func readLogs(lines []string) <-chan LogEntry {
out := make(chan LogEntry)
go func() {
for _, line := range lines {
parts := strings.SplitN(line, " ", 3)
if len(parts) == 3 {
out <- LogEntry{
Timestamp: parts[0],
Level: parts[1],
Message: parts[2],
}
}
}
close(out)
}()
return out
}
// ステージ 2: ERROR のみ抽出
func filterErrors(in <-chan LogEntry) <-chan LogEntry {
out := make(chan LogEntry)
go func() {
for entry := range in {
if entry.Level == "ERROR" {
out <- entry
}
}
close(out)
}()
return out
}
// ステージ 3: フォーマット
func format(in <-chan LogEntry) <-chan string {
out := make(chan string)
go func() {
for entry := range in {
out <- fmt.Sprintf("[%s] %s", entry.Timestamp, entry.Message)
}
close(out)
}()
return out
}このパイプラインは、ログの読み込み、フィルタリング、フォーマットを並行に実行します。
キャンセル対応
長時間動作するパイプラインでは、途中でキャンセルできることが重要です。
func generateWithCancel(ctx context.Context, nums ...int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for _, n := range nums {
select {
case out <- n:
case <-ctx.Done():
return
}
}
}()
return out
}
func doubleWithCancel(ctx context.Context, in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
select {
case out <- n * 2:
case <-ctx.Done():
return
}
}
}()
return out
}context を使うことで、パイプライン全体を安全にキャンセルできます。
使用例
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
ch := generateWithCancel(ctx, 1, 2, 3, 4, 5)
ch = doubleWithCancel(ctx, ch)
for v := range ch {
fmt.Println(v)
time.Sleep(500 * time.Millisecond)
}
}タイムアウトが発生すると、すべてのステージが停止します。
パイプラインの分岐
1 つの入力を複数のパイプラインに分岐させることもできます。
func split(in <-chan int) (<-chan int, <-chan int) {
out1 := make(chan int)
out2 := make(chan int)
go func() {
defer close(out1)
defer close(out2)
for n := range in {
out1 <- n
out2 <- n
}
}()
return out1, out2
}この関数は入力を 2 つの channel に複製します。異なる処理を並列に適用したい場合に便利です。
パイプラインの設計指針
効果的なパイプラインを設計するためのポイントをまとめます。
各ステージは単一の責務を持つ
入力は receive-only channel
出力は send-only channel(関数内で作成)
channel は必ず close する
この規約に従うことで、ステージを自由に組み合わせられる柔軟なパイプラインが構築できます。ステージの追加や入れ替えも容易になります。