Python のスレッドセーフなデータ構造
マルチスレッドプログラミングでは、複数のスレッドが同時にデータにアクセスする可能性があるため、「スレッドセーフ」なデータ構造を選ぶことが重要です。
スレッドセーフとは
スレッドセーフなデータ構造は、複数のスレッドから同時にアクセスされても、データの整合性が保たれます。
スレッドセーフ
ロックなしで安全に使える。内部で同期処理が行われている。
非スレッドセーフ
同時アクセスで不整合が起きる可能性。明示的なロックが必要。
queue.Queue
queue.Queue は最も代表的なスレッドセーフなデータ構造です。
from queue import Queue
import threading
q = Queue()
def producer():
for i in range(100):
q.put(i)
def consumer():
for _ in range(100):
item = q.get()
q.task_done()
# ロックなしで安全に使える
threads = [
threading.Thread(target=producer),
threading.Thread(target=consumer),
]
for t in threads:
t.start()
for t in threads:
t.join()collections.deque
collections.deque は両端キューで、append() と popleft() がスレッドセーフです。
from collections import deque
import threading
d = deque()
def producer():
for i in range(1000):
d.append(i) # スレッドセーフ
def consumer():
count = 0
while count < 1000:
try:
d.popleft() # スレッドセーフ
count += 1
except IndexError:
pass
threads = [
threading.Thread(target=producer),
threading.Thread(target=consumer),
]
for t in threads:
t.start()
for t in threads:
t.join()ただし、deque の len() や複合操作(例:「空でなければ取り出す」)はスレッドセーフではありません。
リストは非スレッドセーフ
Python のリストは、append() など一部の操作は GIL により事実上安全ですが、公式にはスレッドセーフではありません。
import threading
# 危険な例
shared_list = []
def append_items():
for i in range(10000):
shared_list.append(i)
# これは動作するかもしれないが、保証されていない
threads = [threading.Thread(target=append_items) for _ in range(5)]
for t in threads:
t.start()
for t in threads:
t.join()安全のためにはロックを使うか、Queue を使いましょう。
辞書も注意が必要
辞書の個別操作は GIL で保護されますが、複合操作は危険です。
import threading
d = {}
# 危険:複合操作
def update_counter(key):
for _ in range(10000):
if key in d:
d[key] += 1 # 読み取り→加算→書き込み の間に割り込みあり
else:
d[key] = 1安全にするにはロックが必要です。
import threading
d = {}
lock = threading.Lock()
def update_counter(key):
for _ in range(10000):
with lock:
if key in d:
d[key] += 1
else:
d[key] = 1スレッドセーフな操作の例
| 操作 | スレッドセーフ |
|---|---|
| Queue.put() / get() | ○ |
| deque.append() / popleft() | ○ |
| list.append() | △(事実上安全だが非保証) |
| dict[key] = value | △(単一操作なら) |
| dict の読み取り→更新→書き込み | × |
concurrent.futures を使う
スレッドプールを使う場合は、concurrent.futures が便利です。
from concurrent.futures import ThreadPoolExecutor
def task(n):
return n * n
with ThreadPoolExecutor(max_workers=4) as executor:
results = list(executor.map(task, range(10)))
print(results) # [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]まとめ
推奨
queue.Queue を使う。明示的に Lock を使う。ThreadPoolExecutor を使う。
避けるべき
グローバルなリストや辞書を複数スレッドで変更。複合操作をロックなしで行う。
マルチスレッドでデータを共有する場合は、スレッドセーフなデータ構造を選ぶか、適切にロックを使いましょう。



