Go の並行処理におけるエラー伝播(errgroup)

goroutine を複数起動して並行処理を行うとき、エラーの扱いは途端に複雑になる。どの goroutine がエラーを返したのか、1 つが失敗したら他の goroutine はどうするのか、すべての goroutine の完了をどう待つのか――これらを自前で管理しようとすると、チャネルと WaitGroup を組み合わせた煩雑なコードになりがちだ。golang.org/x/sync/errgroup パッケージは、この問題を簡潔に解決する。

goroutine のエラー処理が難しい理由

まず、errgroup を使わずに複数の goroutine からエラーを集める素朴な実装を見てみよう。

func fetchAll(urls []string) error {
	var wg sync.WaitGroup
	errCh := make(chan error, len(urls))

	for _, url := range urls {
		wg.Add(1)
		go func(u string) {
			defer wg.Done()
			resp, err := http.Get(u)
			if err != nil {
				errCh <- err
				return
			}
			resp.Body.Close()
		}(url)
	}

	wg.Wait()
	close(errCh)

	for err := range errCh {
		return err // 最初の 1 つだけ返す
	}
	return nil
}

WaitGroup で完了を待ち、バッファ付きチャネルでエラーを収集し、チャネルを閉じてからエラーを取り出す。動きはするが、問題が複数ある。エラーが複数あっても最初の 1 つしか返せない、1 つの goroutine が失敗しても他は走り続ける、チャネルのバッファサイズを間違えるとデッドロックの恐れがある――といった具合だ。

errgroup の基本

errgroup はこれらの問題を構造的に解消してくれる。

import "golang.org/x/sync/errgroup"

func fetchAll(urls []string) error {
	var g errgroup.Group

	for _, url := range urls {
		g.Go(func() error {
			resp, err := http.Get(url)
			if err != nil {
				return err
			}
			resp.Body.Close()
			return nil
		})
	}

	return g.Wait()
}

WaitGroup もチャネルも消えた。g.Go で goroutine を起動し、g.Wait で全 goroutine の完了を待つ。いずれかの goroutine がエラーを返せば、Wait がそのエラーを返す。複数の goroutine がエラーを返した場合は、最初に発生したエラーが返される。

WaitGroup + チャネル

goroutine の完了待ち、エラー収集、チャネルの開閉をすべて自前で管理する。コードが長くなり、バグの入り込む余地が多い

errgroup

g.Go でエラーを返す goroutine を起動し、g.Wait で待つだけ。エラーの収集と完了待ちを内部で安全に処理する

context によるキャンセル連携

errgroup.WithContext を使うと、1 つの goroutine がエラーを返した時点で他の goroutine にキャンセルを通知できる。

func fetchAll(ctx context.Context, urls []string) error {
	g, ctx := errgroup.WithContext(ctx)

	for _, url := range urls {
		g.Go(func() error {
			req, err := http.NewRequestWithContext(
				ctx, "GET", url, nil,
			)
			if err != nil {
				return fmt.Errorf("create request %s: %w", url, err)
			}
			resp, err := http.DefaultClient.Do(req)
			if err != nil {
				return fmt.Errorf("fetch %s: %w", url, err)
			}
			resp.Body.Close()
			return nil
		})
	}

	return g.Wait()
}

errgroup.WithContext が返す ctx は、いずれかの goroutine がエラーを返した時点でキャンセルされる。各 goroutine が http.NewRequestWithContext でこの ctx を使っていれば、キャンセルされた時点でリクエストが中断される。

goroutine A がエラーを返す

errgroup が内部の context をキャンセル

goroutine B, C の ctx.Done() が発火

残りの goroutine が処理を中断して終了

キャンセルを正しく伝播させるには、各 goroutine 内で ctx を参照する処理を使う必要がある。http.NewRequestWithContext は ctx のキャンセルを監視するが、ctx を無視する処理を書いていると、キャンセル通知を受け取れずに最後まで走り続けてしまう。

並行数の制限

外部 API への同時リクエスト数を制限したい場合、errgroup.SetLimit が使える。

func fetchAll(ctx context.Context, urls []string) error {
	g, ctx := errgroup.WithContext(ctx)
	g.SetLimit(5) // 同時に最大 5 goroutine

	for _, url := range urls {
		g.Go(func() error {
			req, err := http.NewRequestWithContext(
				ctx, "GET", url, nil,
			)
			if err != nil {
				return fmt.Errorf("create request %s: %w", url, err)
			}
			resp, err := http.DefaultClient.Do(req)
			if err != nil {
				return fmt.Errorf("fetch %s: %w", url, err)
			}
			resp.Body.Close()
			return nil
		})
	}

	return g.Wait()
}

SetLimit(5) により、g.Go で起動される goroutine は同時に最大 5 つまでに制限される。6 つ目以降の g.Go 呼び出しは、いずれかの goroutine が完了するまでブロックする仕組みだ。これにより、レートリミットのある外部 API への過剰なリクエストを防げる。

実践的な例:複数ファイルの並行処理

ファイルの読み込みと処理を並行で行い、エラーがあれば即座に中断する実践的な例を見てみよう。

type Result struct {
	Path    string
	Summary string
}

func processFiles(
	ctx context.Context,
	paths []string,
) ([]Result, error) {
	g, ctx := errgroup.WithContext(ctx)
	g.SetLimit(3)

	var mu sync.Mutex
	results := make([]Result, 0, len(paths))

	for _, path := range paths {
		g.Go(func() error {
			select {
			case <-ctx.Done():
				return ctx.Err()
			default:
			}

			data, err := os.ReadFile(path)
			if err != nil {
				return fmt.Errorf("read %s: %w", path, err)
			}

			summary := summarize(data)

			mu.Lock()
			results = append(results, Result{
				Path:    path,
				Summary: summary,
			})
			mu.Unlock()
			return nil
		})
	}

	if err := g.Wait(); err != nil {
		return nil, err
	}
	return results, nil
}

いくつかのポイントがある。まず results スライスへの追加は複数の goroutine から行われるため、sync.Mutex で保護している。goroutine の冒頭で ctx.Done() を確認し、既にキャンセル済みなら処理を開始しないようにしている点も重要だ。SetLimit(3) によりディスク I/O の並行数を抑え、過度な負荷を避けている。

select 文で ctx.Done() を確認しているのは、g.Go 呼び出し時点では goroutine の起動がキューに入るだけで、実際に実行されるのが遅れる可能性があるためだ。

SetLimit により goroutine の起動が待たされている間に、別の goroutine がエラーを返して ctx がキャンセルされている場合がある。

errgroup を使うべきでない場面

errgroup は万能ではない。以下のような場面では別のアプローチが適している。

すべてのエラーを収集したい場合

errgroup は最初のエラーしか返さない。全 goroutine のエラーをまとめて返したいなら、チャネルやスライス+ Mutex で自前収集する必要がある。

goroutine 間でデータを受け渡す場合

errgroup はエラーの伝播に特化しており、goroutine 間の値のやり取りは管轄外だ。パイプラインパターンが必要なら、チャネルを使った設計の方が適している。

長時間動くワーカープール

errgroup は「全 goroutine が終了するまで待つ」設計であり、常駐するワーカーの管理には向かない。サーバーのバックグラウンドワーカーには、専用のワーカープール実装を使うのが一般的だ。

エラーメッセージに文脈を含める

errgroup を使うとき、どの goroutine がエラーを返したのかを判別するために、エラーメッセージに文脈情報を含めることが重要になる。Wait が返すのは最初のエラー 1 つだけなので、それだけで原因を特定できるようにしておく必要がある。

// 悪い例:どの URL で失敗したか分からない
g.Go(func() error {
	_, err := http.Get(url)
	return err
})

// 良い例:URL を含めてラップ
g.Go(func() error {
	_, err := http.Get(url)
	if err != nil {
		return fmt.Errorf("fetch %s: %w", url, err)
	}
	return nil
})

並行処理では複数の goroutine が同時に動いているため、エラーメッセージだけが問題の特定手段になる場面が多い。fmt.Errorf で操作内容と対象を含めておくことで、ログを見ただけで原因箇所を絞り込めるようになる。