Python のスレッドセーフなデータ構造

マルチスレッドプログラミングでは、複数のスレッドが同時にデータにアクセスする可能性があるため、「スレッドセーフ」なデータ構造を選ぶことが重要です。

スレッドセーフとは

スレッドセーフなデータ構造は、複数のスレッドから同時にアクセスされても、データの整合性が保たれます。

スレッドセーフ

ロックなしで安全に使える。内部で同期処理が行われている。

非スレッドセーフ

同時アクセスで不整合が起きる可能性。明示的なロックが必要。

queue.Queue

queue.Queue は最も代表的なスレッドセーフなデータ構造です。

from queue import Queue
import threading

q = Queue()

def producer():
    for i in range(100):
        q.put(i)

def consumer():
    for _ in range(100):
        item = q.get()
        q.task_done()

# ロックなしで安全に使える
threads = [
    threading.Thread(target=producer),
    threading.Thread(target=consumer),
]
for t in threads:
    t.start()
for t in threads:
    t.join()

collections.deque

collections.deque は両端キューで、append()popleft() がスレッドセーフです。

from collections import deque
import threading

d = deque()

def producer():
    for i in range(1000):
        d.append(i)  # スレッドセーフ

def consumer():
    count = 0
    while count < 1000:
        try:
            d.popleft()  # スレッドセーフ
            count += 1
        except IndexError:
            pass

threads = [
    threading.Thread(target=producer),
    threading.Thread(target=consumer),
]
for t in threads:
    t.start()
for t in threads:
    t.join()

ただし、dequelen() や複合操作(例:「空でなければ取り出す」)はスレッドセーフではありません。

リストは非スレッドセーフ

Python のリストは、append() など一部の操作は GIL により事実上安全ですが、公式にはスレッドセーフではありません。

import threading

# 危険な例
shared_list = []

def append_items():
    for i in range(10000):
        shared_list.append(i)

# これは動作するかもしれないが、保証されていない
threads = [threading.Thread(target=append_items) for _ in range(5)]
for t in threads:
    t.start()
for t in threads:
    t.join()

安全のためにはロックを使うか、Queue を使いましょう。

辞書も注意が必要

辞書の個別操作は GIL で保護されますが、複合操作は危険です。

import threading

d = {}

# 危険:複合操作
def update_counter(key):
    for _ in range(10000):
        if key in d:
            d[key] += 1  # 読み取り→加算→書き込み の間に割り込みあり
        else:
            d[key] = 1

安全にするにはロックが必要です。

import threading

d = {}
lock = threading.Lock()

def update_counter(key):
    for _ in range(10000):
        with lock:
            if key in d:
                d[key] += 1
            else:
                d[key] = 1

スレッドセーフな操作の例

操作スレッドセーフ
Queue.put() / get()
deque.append() / popleft()
list.append()△(事実上安全だが非保証)
dict[key] = value△(単一操作なら)
dict の読み取り→更新→書き込み×

concurrent.futures を使う

スレッドプールを使う場合は、concurrent.futures が便利です。

from concurrent.futures import ThreadPoolExecutor

def task(n):
    return n * n

with ThreadPoolExecutor(max_workers=4) as executor:
    results = list(executor.map(task, range(10)))
print(results)  # [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

まとめ

推奨

queue.Queue を使う。明示的に Lock を使う。ThreadPoolExecutor を使う。

避けるべき

グローバルなリストや辞書を複数スレッドで変更。複合操作をロックなしで行う。

マルチスレッドでデータを共有する場合は、スレッドセーフなデータ構造を選ぶか、適切にロックを使いましょう。