Python のジェネレータでパイプライン処理を組む

ジェネレータは単体でも便利だが、複数のジェネレータを数珠つなぎにしたとき真価を発揮する。Unix のパイプのように、データを段階的に変換・フィルタリングしていく構成をパイプライン処理と呼ぶ。

パイプラインの基本構造

パイプラインは「データ源 → 変換 → 変換 → … → 消費」という流れで構成される。各段がジェネレータになっていれば、要素は 1 つずつ流れていくため、全データをメモリに載せる必要がない。

def source(n):
    for i in range(n):
        yield i

def double(upstream):
    for x in upstream:
        yield x * 2

def keep_positive(upstream):
    for x in upstream:
        if x > 0:
            yield x

# パイプラインを組む
pipeline = keep_positive(double(source(5)))

for value in pipeline:
    print(value)
# 2, 4, 6, 8

source が 0〜4 を生成し、double が各値を 2 倍にし、keep_positive が 0 を除外する。関数のネストがそのままデータの流れを表している。

データの流れ方を理解する

このパイプラインでは、すべてのデータが double を通過してから keep_positive に届くわけではない。1 要素ずつ、段階を順に通り抜けていく。

source が 0 を yield する

double が 0 を受け取り 0 を yield する

keep_positive が 0 を受け取るが条件を満たさず捨てる

source に制御が戻り次の 1 を yield する

この「引っ張り型」の実行モデルが重要だ。最終段の for ループが next() を呼ぶたびに、チェーン全体が 1 ステップだけ動く。リスト内包表記で同じ処理を書くと中間リストが毎段生成されるが、ジェネレータパイプラインではそれが起きない。

実用的なログ処理パイプライン

ファイルを 1 行ずつ読み、フィルタリングし、加工して出力するという現実的なパターンを組んでみる。

def read_lines(path):
    with open(path) as f:
        for line in f:
            yield line.rstrip("\n")

def grep(pattern, upstream):
    for line in upstream:
        if pattern in line:
            yield line

def cut_fields(delimiter, index, upstream):
    for line in upstream:
        fields = line.split(delimiter)
        if index < len(fields):
            yield fields[index]

def add_prefix(prefix, upstream):
    for x in upstream:
        yield f"{prefix}{x}"

# アクセスログから ERROR 行の日時だけ抽出
pipeline = add_prefix(
    "",
    cut_fields(
        " ", 0,
        grep("ERROR", read_lines("access.log"))
    )
)

for entry in pipeline:
    print(entry)

read_lines がファイルを 1 行ずつ生成し、grep が ERROR を含む行だけ通し、cut_fields がスペース区切りの先頭フィールド(日時)を取り出し、add_prefix が警告マークを付ける。ファイルがどれだけ大きくても、メモリ上には常に 1 行分しか存在しない。

ネストを解消するヘルパー

関数のネストが深くなると読みにくい。パイプラインを左から右へ読めるようにするヘルパーを作ると見通しがよくなる。

from functools import reduce

def pipe(source, *stages):
    return reduce(lambda data, fn: fn(data), stages, source)

# 各段を関数として定義
def double(upstream):
    for x in upstream:
        yield x * 2

def only_even(upstream):
    for x in upstream:
        if x % 2 == 0:
            yield x

def take(n):
    def _take(upstream):
        count = 0
        for x in upstream:
            if count >= n:
                return
            yield x
            count += 1
    return _take

result = pipe(
    range(100),
    double,
    only_even,
    take(5),
)

print(list(result))  # [0, 4, 8, 12, 16]

pipe 関数は reduce で各段を順に適用していく。データの流れが上から下へ一直線に読めるため、段数が増えても構造を把握しやすい。take のように引数が必要な段はクロージャで包む。

この pipe パターンは Unix の cat file | grep ERROR | cut -d' ' -f1 と同じ発想であり、各段が単一責任を持つ小さなジェネレータになっている。

1 つの関数が 1 つの変換だけを担当する設計原則。テストや差し替えが容易になる。

yield from でパイプラインを簡潔にする

変換を加えずに上流をそのまま流す場合や、複数のソースを結合する場合は yield from が便利だ。

def chain(*sources):
    for src in sources:
        yield from src

def flatten(upstream):
    for item in upstream:
        if hasattr(item, "__iter__") and not isinstance(item, str):
            yield from flatten(item)
        else:
            yield item

nested = [[1, [2, 3]], [4, [5, [6]]]]
print(list(flatten(nested)))  # [1, 2, 3, 4, 5, 6]

# 複数ソースの結合
combined = chain(range(3), range(10, 13), range(100, 103))
print(list(combined))  # [0, 1, 2, 10, 11, 12, 100, 101, 102]

flatten はネストされたイテラブルを再帰的に展開する。yield from を使うと、内側のイテラブルの要素を 1 つずつ外側に送り出す処理をループなしで書ける。

パイプラインの分岐と合流

1 つのデータ源を複数の処理に分岐させたい場合、itertools.tee を使う。ただし注意点がある。

from itertools import tee

def source():
    for i in range(5):
        yield i

def to_square(upstream):
    for x in upstream:
        yield x ** 2

def to_cube(upstream):
    for x in upstream:
        yield x ** 3

gen = source()
branch_a, branch_b = tee(gen)

squares = to_square(branch_a)
cubes = to_cube(branch_b)

print(list(squares))  # [0, 1, 4, 9, 16]
print(list(cubes))    # [0, 1, 8, 27, 64]
tee の利点

1 つのジェネレータから複数の独立したイテレータを作れる。各分岐は独立して消費できる。

tee の注意点

片方の分岐だけ先に進むと、もう片方が追いつくまで内部バッファにデータが蓄積される。消費速度に大きな差があるとメモリ効率が悪化する。

エラーハンドリングを組み込む

パイプラインの途中でエラーが発生しても、処理可能な要素だけ流し続けたい場合がある。

def safe_int(upstream):
    for item in upstream:
        try:
            yield int(item)
        except (ValueError, TypeError):
            continue  # 変換できない要素はスキップ

def moving_average(window, upstream):
    buf = []
    for x in upstream:
        buf.append(x)
        if len(buf) > window:
            buf.pop(0)
        if len(buf) == window:
            yield sum(buf) / window

raw_data = ["10", "20", "abc", "30", "40", None, "50"]

pipeline = moving_average(
    3,
    safe_int(raw_data)
)

print(list(pipeline))  # [20.0, 30.0, 40.0]

safe_int が不正な値を静かにスキップし、moving_average がウィンドウ幅 3 の移動平均を計算する。各段が自分の責任範囲だけを処理するため、エラーハンドリングのロジックが散らばらない。

パイプラインとリスト内包表記の比較

同じ処理をリスト内包表記で書いた場合と比較してみる。

ジェネレータパイプライン

メモリ使用量は要素数に依存しない。段ごとに関数が分かれるためテストしやすく、段の追加・削除・並び替えも容易。ただし実行速度はリスト内包表記よりわずかに遅い。

リスト内包表記

中間リストが毎段生成されるため、データ量が大きいとメモリを圧迫する。小規模データなら高速でコードも簡潔。複雑な多段処理になると 1 行が長大になり可読性が下がる。

数百万件を超えるデータや、ファイル・ネットワークからのストリーム入力ではジェネレータパイプラインが有利になる。逆に数百件程度のデータなら、リスト内包表記のほうが素直で速い。

ジェネレータパイプラインでデータが処理される順序として正しいのはどれか?

  • 全データが第 1 段を通過した後、第 2 段の処理が始まる
  • 1 要素ずつ全段を通り抜けてから次の要素が処理される
  • 全段が並列に動作し、同時に処理される
  • 最終段から逆順に処理が進む
__RESULT__

ジェネレータパイプラインは引っ張り型の実行モデルで動く。最終段の消費側が next() を呼ぶたびに、上流のジェネレータが 1 要素分だけ動作し、その要素が全段を通り抜ける。

itertools.tee で分岐を作ったとき、片方の分岐だけを大量に消費するとどうなるか?

  • もう片方の分岐が自動的に追従する
  • もう片方が追いつくまで内部バッファにデータが蓄積される
  • 先に進んだ分岐がブロックされる
  • 消費されなかった分岐のデータは破棄される
__RESULT__

tee は両方の分岐が独立して消費できるよう、先に読まれたデータを内部バッファに保持する。消費速度に差があるとバッファが膨らみ、メモリ効率が悪化する。