Python の Semaphore で同時アクセス数を制限する

Semaphore(セマフォ)は、同時にリソースにアクセスできるスレッド数を制限するための同期機構です。Lock が「1つのスレッドだけ」を許可するのに対し、Semaphore は「N個のスレッドまで」を許可できます。

Semaphore の基本

Semaphore はカウンターを持っており、acquire() でカウンターを減らし、release() で増やします。カウンターが0になると、他のスレッドは待機します。

import threading
import time

# 同時に3つまでアクセス可能
semaphore = threading.Semaphore(3)

def worker(n):
    with semaphore:
        print(f"Worker {n} 開始")
        time.sleep(2)
        print(f"Worker {n} 終了")

threads = [threading.Thread(target=worker, args=(i,)) for i in range(6)]
for t in threads:
    t.start()
for t in threads:
    t.join()

この例では、一度に実行されるのは最大3つのワーカーです。

Lock と Semaphore の比較

Lock

同時に1つのスレッドだけがアクセス可能。カウンターは0か1。

Semaphore

同時にN個のスレッドがアクセス可能。カウンターは0からN。

Semaphore(1)Lock と同様の動作になります。

実用例:接続プール

データベース接続など、同時接続数を制限したい場合に使えます。

import threading
import time
import random

class ConnectionPool:
    def __init__(self, max_connections):
        self.semaphore = threading.Semaphore(max_connections)
        self.connections = []
    
    def get_connection(self):
        self.semaphore.acquire()
        return f"Connection-{threading.current_thread().name}"
    
    def release_connection(self, conn):
        self.semaphore.release()

pool = ConnectionPool(3)

def worker():
    conn = pool.get_connection()
    print(f"{conn} を取得")
    time.sleep(random.uniform(1, 3))  # 処理をシミュレート
    print(f"{conn} を解放")
    pool.release_connection(conn)

threads = [threading.Thread(target=worker, name=f"Thread-{i}") for i in range(6)]
for t in threads:
    t.start()
for t in threads:
    t.join()

実用例:レート制限

API呼び出しのレート制限をシミュレートします。

import threading
import time

# 同時に2つまでAPI呼び出し可能
rate_limiter = threading.Semaphore(2)

def api_call(request_id):
    with rate_limiter:
        print(f"Request {request_id}: API呼び出し開始")
        time.sleep(1)  # API処理時間
        print(f"Request {request_id}: API呼び出し完了")

threads = [threading.Thread(target=api_call, args=(i,)) for i in range(5)]
for t in threads:
    t.start()
for t in threads:
    t.join()

BoundedSemaphore

BoundedSemaphore は、release() の呼び出し回数が acquire() を超えるとエラーになります。

import threading

# 通常の Semaphore
sem = threading.Semaphore(2)
sem.release()  # カウンターが3になる(エラーにならない)
print("通常: release多すぎてもエラーにならない")

# BoundedSemaphore
bsem = threading.BoundedSemaphore(2)
try:
    bsem.release()  # ValueError が発生
except ValueError as e:
    print(f"BoundedSemaphore: {e}")

BoundedSemaphore はプログラムのバグを検出するのに役立ちます。

with 文での使用

Semaphore もコンテキストマネージャをサポートしています。

import threading

semaphore = threading.Semaphore(3)

def task():
    with semaphore:
        # この中は最大3スレッドまで同時実行
        pass

タイムアウト付きの取得

import threading
import time

semaphore = threading.Semaphore(1)

def worker(name):
    acquired = semaphore.acquire(timeout=2)
    if acquired:
        try:
            print(f"{name}: セマフォ取得")
            time.sleep(3)
        finally:
            semaphore.release()
    else:
        print(f"{name}: タイムアウト")

t1 = threading.Thread(target=worker, args=("A",))
t2 = threading.Thread(target=worker, args=("B",))

t1.start()
time.sleep(0.1)
t2.start()

t1.join()
t2.join()

Semaphore は、リソースの同時利用数を制限したい場面で非常に有用です。