Python の Semaphore で同時アクセス数を制限する
Semaphore(セマフォ)は、同時にリソースにアクセスできるスレッド数を制限するための同期機構です。Lock が「1つのスレッドだけ」を許可するのに対し、Semaphore は「N個のスレッドまで」を許可できます。
Semaphore の基本
Semaphore はカウンターを持っており、acquire() でカウンターを減らし、release() で増やします。カウンターが0になると、他のスレッドは待機します。
import threading
import time
# 同時に3つまでアクセス可能
semaphore = threading.Semaphore(3)
def worker(n):
with semaphore:
print(f"Worker {n} 開始")
time.sleep(2)
print(f"Worker {n} 終了")
threads = [threading.Thread(target=worker, args=(i,)) for i in range(6)]
for t in threads:
t.start()
for t in threads:
t.join()この例では、一度に実行されるのは最大3つのワーカーです。
Lock と Semaphore の比較
Lock
同時に1つのスレッドだけがアクセス可能。カウンターは0か1。
Semaphore
同時にN個のスレッドがアクセス可能。カウンターは0からN。
Semaphore(1) は Lock と同様の動作になります。
実用例:接続プール
データベース接続など、同時接続数を制限したい場合に使えます。
import threading
import time
import random
class ConnectionPool:
def __init__(self, max_connections):
self.semaphore = threading.Semaphore(max_connections)
self.connections = []
def get_connection(self):
self.semaphore.acquire()
return f"Connection-{threading.current_thread().name}"
def release_connection(self, conn):
self.semaphore.release()
pool = ConnectionPool(3)
def worker():
conn = pool.get_connection()
print(f"{conn} を取得")
time.sleep(random.uniform(1, 3)) # 処理をシミュレート
print(f"{conn} を解放")
pool.release_connection(conn)
threads = [threading.Thread(target=worker, name=f"Thread-{i}") for i in range(6)]
for t in threads:
t.start()
for t in threads:
t.join()実用例:レート制限
API呼び出しのレート制限をシミュレートします。
import threading
import time
# 同時に2つまでAPI呼び出し可能
rate_limiter = threading.Semaphore(2)
def api_call(request_id):
with rate_limiter:
print(f"Request {request_id}: API呼び出し開始")
time.sleep(1) # API処理時間
print(f"Request {request_id}: API呼び出し完了")
threads = [threading.Thread(target=api_call, args=(i,)) for i in range(5)]
for t in threads:
t.start()
for t in threads:
t.join()BoundedSemaphore
BoundedSemaphore は、release() の呼び出し回数が acquire() を超えるとエラーになります。
import threading
# 通常の Semaphore
sem = threading.Semaphore(2)
sem.release() # カウンターが3になる(エラーにならない)
print("通常: release多すぎてもエラーにならない")
# BoundedSemaphore
bsem = threading.BoundedSemaphore(2)
try:
bsem.release() # ValueError が発生
except ValueError as e:
print(f"BoundedSemaphore: {e}")BoundedSemaphore はプログラムのバグを検出するのに役立ちます。
with 文での使用
Semaphore もコンテキストマネージャをサポートしています。
import threading
semaphore = threading.Semaphore(3)
def task():
with semaphore:
# この中は最大3スレッドまで同時実行
passタイムアウト付きの取得
import threading
import time
semaphore = threading.Semaphore(1)
def worker(name):
acquired = semaphore.acquire(timeout=2)
if acquired:
try:
print(f"{name}: セマフォ取得")
time.sleep(3)
finally:
semaphore.release()
else:
print(f"{name}: タイムアウト")
t1 = threading.Thread(target=worker, args=("A",))
t2 = threading.Thread(target=worker, args=("B",))
t1.start()
time.sleep(0.1)
t2.start()
t1.join()
t2.join()Semaphore は、リソースの同時利用数を制限したい場面で非常に有用です。



