Python の ThreadPoolExecutor でスレッドプールを使う
ThreadPoolExecutor は、スレッドプールを簡単に管理できるクラスです。concurrent.futures モジュールに含まれており、スレッドの作成・管理を抽象化してくれます。
基本的な使い方
ThreadPoolExecutor を使うと、スレッドプールにタスクを投入し、結果を受け取れます。
from concurrent.futures import ThreadPoolExecutor
import time
def task(n):
time.sleep(1)
return n * n
with ThreadPoolExecutor(max_workers=3) as executor:
future = executor.submit(task, 5)
result = future.result() # 結果を待機して取得
print(f"結果: {result}") # 結果: 25with 文を使うと、ブロック終了時に自動的にスレッドプールがシャットダウンされます。
submit() と Future
submit() はタスクを投入し、Future オブジェクトを返します。Future から結果や例外を取得できます。
from concurrent.futures import ThreadPoolExecutor
def divide(a, b):
return a / b
with ThreadPoolExecutor() as executor:
future1 = executor.submit(divide, 10, 2)
future2 = executor.submit(divide, 10, 0)
print(future1.result()) # 5.0
try:
print(future2.result())
except ZeroDivisionError as e:
print(f"エラー: {e}")map() で複数のタスクを実行
map() を使うと、イテラブルの各要素に対して関数を並列実行できます。
from concurrent.futures import ThreadPoolExecutor
import time
def process(n):
time.sleep(1)
return n * 2
with ThreadPoolExecutor(max_workers=4) as executor:
results = list(executor.map(process, range(8)))
print(results) # [0, 2, 4, 6, 8, 10, 12, 14]
# 4並列なので約2秒で完了map() の結果は入力の順序を保持します。
as_completed() で完了順に処理
as_completed() を使うと、タスクが完了した順に結果を取得できます。
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
import random
def task(n):
sleep_time = random.uniform(0.5, 2)
time.sleep(sleep_time)
return n, sleep_time
with ThreadPoolExecutor(max_workers=3) as executor:
futures = {executor.submit(task, i): i for i in range(5)}
for future in as_completed(futures):
n, sleep_time = future.result()
print(f"Task {n} 完了 ({sleep_time:.2f}秒)")Future のメソッド
result(timeout=None)
結果を取得。タスク完了まで待機。
exception(timeout=None)
発生した例外を取得。例外がなければ None。
done()
タスクが完了したかどうか。
cancel()
タスクのキャンセルを試みる(実行中は不可)。
cancelled()
キャンセルされたかどうか。
タイムアウト
result() や map() にタイムアウトを指定できます。
from concurrent.futures import ThreadPoolExecutor, TimeoutError
import time
def slow_task():
time.sleep(10)
return "完了"
with ThreadPoolExecutor() as executor:
future = executor.submit(slow_task)
try:
result = future.result(timeout=2)
except TimeoutError:
print("タイムアウト")実用例:並列ダウンロード
複数の URL を並列でダウンロードする例です。
from concurrent.futures import ThreadPoolExecutor, as_completed
import urllib.request
import time
def download(url):
start = time.time()
response = urllib.request.urlopen(url)
data = response.read()
elapsed = time.time() - start
return url, len(data), elapsed
urls = [
"https://example.com",
"https://example.org",
"https://example.net",
]
with ThreadPoolExecutor(max_workers=3) as executor:
futures = [executor.submit(download, url) for url in urls]
for future in as_completed(futures):
url, size, elapsed = future.result()
print(f"{url}: {size} bytes ({elapsed:.2f}秒)")コールバックの登録
Future にコールバックを登録して、完了時に自動実行させることもできます。
from concurrent.futures import ThreadPoolExecutor
def task(n):
return n * n
def callback(future):
print(f"完了: {future.result()}")
with ThreadPoolExecutor() as executor:
future = executor.submit(task, 5)
future.add_done_callback(callback)ThreadPoolExecutor を使えば、低レベルのスレッド管理を気にせずに並列処理を実装できます。



