Python の apply_async で非同期にプロセスを実行する
Python の multiprocessing.Pool には、非同期にタスクを実行する apply_async() メソッドがあります。タスクの投入と結果の取得を分離でき、より柔軟な並列処理が可能になります。
apply_async の基本
apply_async() はタスクを投入し、すぐに AsyncResult オブジェクトを返します。
from multiprocessing import Pool
import time
def slow_task(x):
time.sleep(2)
return x ** 2
if __name__ == "__main__":
with Pool(4) as pool:
result = pool.apply_async(slow_task, (5,))
print("Task submitted")
# 他の処理ができる
print("Doing other work...")
# 結果を取得(ブロックする)
print(f"Result: {result.get()}")
apply_async() は即座に戻るので、結果を待つ間に他の処理を実行できます。
apply と apply_async の違い
同期的に実行し、結果を直接返す。結果が得られるまでブロック。
非同期に実行し、AsyncResult を返す。すぐに制御が戻る。
apply() は 1 つのタスクを同期的に実行するため、並列処理には向きません。
AsyncResult のメソッド
apply_async() が返す AsyncResult には、結果を取得するためのメソッドがあります。
| get(timeout) | 結果を取得(タイムアウト指定可) |
| ready() | タスクが完了したか確認 |
| successful() | 例外なく完了したか確認 |
| wait(timeout) | 完了を待機 |
複数のタスクを非同期に実行する
複数のタスクを投入し、それぞれの結果を取得できます。
from multiprocessing import Pool
import time
def process(n):
time.sleep(1)
return n * 2
if __name__ == "__main__":
with Pool(4) as pool:
results = [pool.apply_async(process, (i,)) for i in range(10)]
# すべての結果を取得
output = [r.get() for r in results]
print(output) # [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
タスクは並列に実行されるため、全体の処理時間は大幅に短縮されます。
タイムアウト付きで結果を取得する
get(timeout) でタイムアウトを指定できます。
from multiprocessing import Pool, TimeoutError
import time
def slow_task():
time.sleep(10)
return "Done"
if __name__ == "__main__":
with Pool() as pool:
result = pool.apply_async(slow_task)
try:
value = result.get(timeout=3)
except TimeoutError:
print("Task timed out")
タイムアウトすると multiprocessing.TimeoutError が発生します。
callback と error_callback
タスク完了時に呼ばれるコールバック関数を指定できます。
from multiprocessing import Pool
def compute(x):
return x ** 2
def on_success(result):
print(f"Success: {result}")
def on_error(error):
print(f"Error: {error}")
if __name__ == "__main__":
with Pool() as pool:
result = pool.apply_async(
compute,
(5,),
callback=on_success,
error_callback=on_error
)
result.wait()
callback は結果を引数に受け取り、error_callback は例外オブジェクトを受け取ります。
ready と successful で状態を確認する
ブロックせずにタスクの状態を確認できます。
from multiprocessing import Pool
import time
def task():
time.sleep(2)
return "Done"
if __name__ == "__main__":
with Pool() as pool:
result = pool.apply_async(task)
while not result.ready():
print("Still working...")
time.sleep(0.5)
if result.successful():
print(f"Result: {result.get()}")
else:
print("Task failed")
ready() は完了したかどうか、successful() は例外なく完了したかどうかを返します。
map_async との比較
map_async() は map() の非同期版です。
from multiprocessing import Pool
def square(x):
return x ** 2
if __name__ == "__main__":
with Pool(4) as pool:
result = pool.map_async(square, range(10))
# 他の処理
print("Processing...")
# すべての結果を取得
print(result.get())
apply_async は 1 つのタスク
map_async は複数のタスクを一括投入
どちらも AsyncResult を返す
get() で結果を取得
大量のタスクを一括投入する場合は map_async() のほうが効率的です。