Python の ThreadPoolExecutor でスレッドプールを使う

ThreadPoolExecutor は、スレッドプールを簡単に管理できるクラスです。concurrent.futures モジュールに含まれており、スレッドの作成・管理を抽象化してくれます。

基本的な使い方

ThreadPoolExecutor を使うと、スレッドプールにタスクを投入し、結果を受け取れます。

from concurrent.futures import ThreadPoolExecutor
import time

def task(n):
    time.sleep(1)
    return n * n

with ThreadPoolExecutor(max_workers=3) as executor:
    future = executor.submit(task, 5)
    result = future.result()  # 結果を待機して取得
    print(f"結果: {result}")  # 結果: 25

with 文を使うと、ブロック終了時に自動的にスレッドプールがシャットダウンされます。

submit() と Future

submit() はタスクを投入し、Future オブジェクトを返します。Future から結果や例外を取得できます。

from concurrent.futures import ThreadPoolExecutor

def divide(a, b):
    return a / b

with ThreadPoolExecutor() as executor:
    future1 = executor.submit(divide, 10, 2)
    future2 = executor.submit(divide, 10, 0)
    
    print(future1.result())  # 5.0
    
    try:
        print(future2.result())
    except ZeroDivisionError as e:
        print(f"エラー: {e}")

map() で複数のタスクを実行

map() を使うと、イテラブルの各要素に対して関数を並列実行できます。

from concurrent.futures import ThreadPoolExecutor
import time

def process(n):
    time.sleep(1)
    return n * 2

with ThreadPoolExecutor(max_workers=4) as executor:
    results = list(executor.map(process, range(8)))

print(results)  # [0, 2, 4, 6, 8, 10, 12, 14]
# 4並列なので約2秒で完了

map() の結果は入力の順序を保持します。

as_completed() で完了順に処理

as_completed() を使うと、タスクが完了した順に結果を取得できます。

from concurrent.futures import ThreadPoolExecutor, as_completed
import time
import random

def task(n):
    sleep_time = random.uniform(0.5, 2)
    time.sleep(sleep_time)
    return n, sleep_time

with ThreadPoolExecutor(max_workers=3) as executor:
    futures = {executor.submit(task, i): i for i in range(5)}
    
    for future in as_completed(futures):
        n, sleep_time = future.result()
        print(f"Task {n} 完了 ({sleep_time:.2f}秒)")

Future のメソッド

result(timeout=None)

結果を取得。タスク完了まで待機。

exception(timeout=None)

発生した例外を取得。例外がなければ None。

done()

タスクが完了したかどうか。

cancel()

タスクのキャンセルを試みる(実行中は不可)。

cancelled()

キャンセルされたかどうか。

タイムアウト

result()map() にタイムアウトを指定できます。

from concurrent.futures import ThreadPoolExecutor, TimeoutError
import time

def slow_task():
    time.sleep(10)
    return "完了"

with ThreadPoolExecutor() as executor:
    future = executor.submit(slow_task)
    try:
        result = future.result(timeout=2)
    except TimeoutError:
        print("タイムアウト")

実用例:並列ダウンロード

複数の URL を並列でダウンロードする例です。

from concurrent.futures import ThreadPoolExecutor, as_completed
import urllib.request
import time

def download(url):
    start = time.time()
    response = urllib.request.urlopen(url)
    data = response.read()
    elapsed = time.time() - start
    return url, len(data), elapsed

urls = [
    "https://example.com",
    "https://example.org",
    "https://example.net",
]

with ThreadPoolExecutor(max_workers=3) as executor:
    futures = [executor.submit(download, url) for url in urls]
    
    for future in as_completed(futures):
        url, size, elapsed = future.result()
        print(f"{url}: {size} bytes ({elapsed:.2f}秒)")

コールバックの登録

Future にコールバックを登録して、完了時に自動実行させることもできます。

from concurrent.futures import ThreadPoolExecutor

def task(n):
    return n * n

def callback(future):
    print(f"完了: {future.result()}")

with ThreadPoolExecutor() as executor:
    future = executor.submit(task, 5)
    future.add_done_callback(callback)

ThreadPoolExecutor を使えば、低レベルのスレッド管理を気にせずに並列処理を実装できます。