Python の Value と Array で共有メモリを使う

Python の multiprocessing モジュールには、プロセス間でメモリを共有するための ValueArray が用意されています。これらを使うと、pickle によるシリアライズのオーバーヘッドなしにデータを共有できます。

Value の基本

Value は単一の値を共有するためのクラスです。

from multiprocessing import Process, Value

def increment(counter):
    for _ in range(1000):
        counter.value += 1

if __name__ == "__main__":
    counter = Value('i', 0)  # 'i' は int 型
    
    p1 = Process(target=increment, args=(counter,))
    p2 = Process(target=increment, args=(counter,))
    
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    
    print(counter.value)  # 2000 にならないことがある

ただし、この例では競合状態が発生する可能性があります。後述のロックと組み合わせて使う必要があります。

型コード

ValueArray には型コードを指定します。

'i'int(符号付き整数)
'I'unsigned int(符号なし整数)
'd'double(倍精度浮動小数点)
'f'float(単精度浮動小数点)
'c'char(1 文字)
'b'signed char
'B'unsigned char

型コードは Python の array モジュールと同じ形式です。

Array の基本

Array は配列を共有するためのクラスです。

from multiprocessing import Process, Array

def fill_array(arr):
    for i in range(len(arr)):
        arr[i] = i * 2

if __name__ == "__main__":
    arr = Array('i', 5)  # int 型の配列、要素数 5
    
    p = Process(target=fill_array, args=(arr,))
    p.start()
    p.join()
    
    print(arr[:])  # [0, 2, 4, 6, 8]

arr[:] でリストに変換して表示できます。

ロック付きでアクセスする

ValueArray にはデフォルトでロックが組み込まれています。get_lock() でロックを取得して使います。

from multiprocessing import Process, Value

def safe_increment(counter):
    for _ in range(1000):
        with counter.get_lock():
            counter.value += 1

if __name__ == "__main__":
    counter = Value('i', 0)
    
    p1 = Process(target=safe_increment, args=(counter,))
    p2 = Process(target=safe_increment, args=(counter,))
    
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    
    print(counter.value)  # 必ず 2000 になる

with counter.get_lock(): でロックを取得し、安全に値を更新できます。

ロックなしの Value と Array

パフォーマンスを重視する場合は、lock=False でロックなしのオブジェクトを作成できます。

from multiprocessing import Process, Value, Lock

def worker(counter, lock):
    for _ in range(1000):
        with lock:
            counter.value += 1

if __name__ == "__main__":
    counter = Value('i', 0, lock=False)
    lock = Lock()
    
    p1 = Process(target=worker, args=(counter, lock))
    p2 = Process(target=worker, args=(counter, lock))
    
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    
    print(counter.value)

この場合、別途 Lock を作成して同期を管理します。

Array の初期化

Array は初期値を指定して作成することもできます。

from multiprocessing import Array

# サイズを指定(0 で初期化)
arr1 = Array('i', 5)

# 初期値を指定
arr2 = Array('i', [1, 2, 3, 4, 5])

# range から作成
arr3 = Array('d', range(10))

制限事項

ValueArray は ctypes の型に限定されます。

リストや辞書などの複雑なオブジェクトは格納できない
文字列を格納するには 'c' 型の Array を使う
任意のオブジェクトを共有したい場合は Manager を使う

複雑なデータ構造を共有したい場合は、multiprocessing.Manager の使用を検討してください。