Go のバッファ付き channel で非同期処理を行う

channel を作成する際、バッファサイズを指定することで「バッファ付き channel」を作れます。バッファなしの channel は送信と受信が同時に行われる必要がありますが、バッファ付きではその制約が緩和されます。

バッファなし vs バッファ付き

まずは違いを確認しましょう。

バッファなし channel

送信側は受信側が受け取るまでブロックする。同期的な通信。

バッファ付き channel

バッファが空いていれば送信側はブロックしない。非同期的な通信が可能。

バッファなし channel は make(chan int) で作成し、バッファ付きは make(chan int, 3) のようにサイズを指定します。

バッファ付き channel の基本

package main

import "fmt"

func main() {
    ch := make(chan int, 3) // バッファサイズ 3

    ch <- 1
    ch <- 2
    ch <- 3
    // ch <- 4 // これはブロックする(バッファが満杯)

    fmt.Println(<-ch) // 1
    fmt.Println(<-ch) // 2
    fmt.Println(<-ch) // 3
}

バッファサイズ 3 の channel には、受信側がいなくても 3 つまで値を送信できます。4 つ目を送ろうとすると、バッファに空きができるまでブロックします。

ブロックの挙動を確認する

バッファが満杯になったときの挙動を見てみましょう。

func main() {
    ch := make(chan int, 2)

    ch <- 1
    fmt.Println("1 を送信")
    ch <- 2
    fmt.Println("2 を送信")

    go func() {
        time.Sleep(2 * time.Second)
        fmt.Println("受信:", <-ch)
    }()

    ch <- 3 // バッファが満杯なのでブロック
    fmt.Println("3 を送信")
}

3 を送信しようとした時点でバッファは満杯です。goroutine が 2 秒後に 1 つ受信するまで、送信はブロックされます。

len と cap で状態を確認

len() でバッファ内の要素数、cap() でバッファの容量を確認できます。

func main() {
    ch := make(chan string, 5)

    ch <- "a"
    ch <- "b"
    ch <- "c"

    fmt.Println("要素数:", len(ch)) // 3
    fmt.Println("容量:", cap(ch))   // 5
}

この情報を使って、channel の混雑状況を監視することもできます。

実践例:ジョブキュー

バッファ付き channel はジョブキューとして使えます。

func main() {
    jobs := make(chan int, 10) // 最大 10 件のジョブを保持

    // プロデューサー:ジョブを投入
    go func() {
        for i := 1; i <= 5; i++ {
            jobs <- i
            fmt.Println("ジョブ投入:", i)
        }
        close(jobs)
    }()

    // コンシューマー:ジョブを処理
    for job := range jobs {
        fmt.Println("ジョブ処理:", job)
        time.Sleep(500 * time.Millisecond)
    }
}

プロデューサーはバッファに空きがある限りジョブを投入し続けられます。コンシューマーの処理速度に関係なく、一定量のジョブを先に投入できるのがメリットです。

セマフォとしての利用

バッファ付き channel は同時実行数を制限するセマフォとしても使えます。

func main() {
    sem := make(chan struct{}, 3) // 同時に 3 つまで
    var wg sync.WaitGroup

    for i := 1; i <= 10; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()

            sem <- struct{}{} // セマフォ取得
            defer func() { <-sem }() // セマフォ解放

            fmt.Printf("タスク %d 開始\n", id)
            time.Sleep(1 * time.Second)
            fmt.Printf("タスク %d 完了\n", id)
        }(i)
    }

    wg.Wait()
}

バッファサイズが 3 なので、同時に実行されるタスクは最大 3 つに制限されます。API のレート制限やリソースの保護に便利です。

バッファサイズの選び方

バッファサイズは大きすぎても小さすぎても問題があります。

小さすぎる場合

送信側が頻繁にブロックし、並行処理の効率が下がる。

大きすぎる場合

メモリを無駄に消費する。また、問題の発見が遅れることがある。

一般的には、プロデューサーとコンシューマーの速度差を吸収できる程度のサイズを設定します。実測して調整するのがベストです。