WSGI サーバーを自作する
WSGI サーバーを自作することで、HTTP 通信の低レベルな処理から WSGI インターフェースへの橋渡しまで、Web サーバーの動作原理を体系的に理解できる。本番環境では Gunicorn や uWSGI を使うべきだが、自作を通じて得られる知識は、トラブルシューティングやパフォーマンスチューニングの基盤となる。
最小限の WSGI サーバー
まずは動作する最小限の WSGI サーバーを実装する。標準ライブラリの socket モジュールを使い、TCP 接続を受け付けて HTTP リクエストをパースし、WSGI アプリケーションを呼び出す。
import socket
import sys
from io import BytesIO
class SimpleWSGIServer:
def __init__(self, host, port):
self.host = host
self.port = port
self.app = None
def set_app(self, app):
self.app = app
def serve_forever(self):
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_socket.bind((self.host, self.port))
server_socket.listen(5)
print(f'Serving on {self.host}:{self.port}')
while True:
client_socket, client_address = server_socket.accept()
self.handle_request(client_socket)
client_socket.close()serve_forever メソッドでソケットをリッスンし、接続を受け付けたら handle_request でリクエストを処理する。
HTTP リクエストのパース
HTTP リクエストを受け取り、パースして environ 辞書を構築する。
def handle_request(self, client_socket):
request_data = client_socket.recv(4096)
if not request_data:
return
# リクエストをパース
environ = self.parse_request(request_data)
# WSGI アプリケーションを呼び出す
response_body = self.call_app(environ)
# レスポンスを送信
client_socket.sendall(response_body)
def parse_request(self, request_data):
lines = request_data.decode('iso-8859-1').split('\r\n')
request_line = lines[0]
method, path, protocol = request_line.split()
# パスとクエリ文字列を分離
if '?' in path:
path_info, query_string = path.split('?', 1)
else:
path_info, query_string = path, ''
# ヘッダーをパース
headers = {}
i = 1
while i < len(lines) and lines[i]:
key, value = lines[i].split(': ', 1)
headers[key] = value
i += 1
# リクエストボディ
body = '\r\n'.join(lines[i+1:]) if i + 1 < len(lines) else ''
# environ 辞書を構築
environ = {
'REQUEST_METHOD': method,
'SCRIPT_NAME': '',
'PATH_INFO': path_info,
'QUERY_STRING': query_string,
'SERVER_NAME': self.host,
'SERVER_PORT': str(self.port),
'SERVER_PROTOCOL': protocol,
'wsgi.version': (1, 0),
'wsgi.url_scheme': 'http',
'wsgi.input': BytesIO(body.encode('iso-8859-1')),
'wsgi.errors': sys.stderr,
'wsgi.multithread': False,
'wsgi.multiprocess': False,
'wsgi.run_once': False,
}
# HTTP ヘッダーを environ に追加
for key, value in headers.items():
key = key.upper().replace('-', '_')
if key == 'CONTENT_TYPE':
environ['CONTENT_TYPE'] = value
elif key == 'CONTENT_LENGTH':
environ['CONTENT_LENGTH'] = value
else:
environ[f'HTTP_{key}'] = value
return environHTTP ヘッダーは HTTP_ プレフィックスを付けて environ に格納する。ただし Content-Type と Content-Length は例外で、プレフィックスなしで格納する。これは CGI 仕様との互換性のためだ。
WSGI アプリケーションの呼び出し
environ を構築したら、WSGI アプリケーションを呼び出す。start_response コールバックを実装し、レスポンスを組み立てる。
def call_app(self, environ):
response_started = False
status = None
response_headers = []
def start_response(response_status, headers, exc_info=None):
nonlocal response_started, status, response_headers
if exc_info:
if response_started:
# ヘッダー送信済みなら例外を再送出
raise exc_info[1].with_traceback(exc_info[2])
exc_info = None
elif response_started:
raise RuntimeError('Response already started')
status = response_status
response_headers = headers
response_started = True
# write 関数を返す(非推奨だが仕様で必要)
return lambda s: None
# アプリケーションを呼び出し
result = self.app(environ, start_response)
# レスポンスボディを収集
try:
body_parts = []
for chunk in result:
body_parts.append(chunk)
body = b''.join(body_parts)
finally:
# close() メソッドがあれば呼び出す
if hasattr(result, 'close'):
result.close()
# HTTP レスポンスを構築
response_line = f'HTTP/1.1 {status}\r\n'
headers_text = ''.join(f'{k}: {v}\r\n' for k, v in response_headers)
return response_line.encode() + headers_text.encode() + b'\r\n' + bodystart_response は状態を保持するクロージャとして実装している。アプリケーションがこの関数を呼び出すと、ステータスとヘッダーが記録される。
完全な実装
ここまでの部品を組み合わせた完全な実装を示す。
import socket
import sys
from io import BytesIO
class SimpleWSGIServer:
def __init__(self, host='localhost', port=8000):
self.host = host
self.port = port
self.app = None
def set_app(self, app):
self.app = app
def serve_forever(self):
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_socket.bind((self.host, self.port))
server_socket.listen(5)
print(f'Serving on http://{self.host}:{self.port}')
try:
while True:
client_socket, client_address = server_socket.accept()
try:
self.handle_request(client_socket)
except Exception as e:
print(f'Error handling request: {e}', file=sys.stderr)
finally:
client_socket.close()
except KeyboardInterrupt:
print('\nShutting down...')
finally:
server_socket.close()
def handle_request(self, client_socket):
request_data = client_socket.recv(65536)
if not request_data:
return
environ = self.parse_request(request_data)
response = self.call_app(environ)
client_socket.sendall(response)
def parse_request(self, request_data):
# ヘッダーとボディを分離
if b'\r\n\r\n' in request_data:
header_data, body_data = request_data.split(b'\r\n\r\n', 1)
else:
header_data, body_data = request_data, b''
lines = header_data.decode('iso-8859-1').split('\r\n')
request_line = lines[0]
method, path, protocol = request_line.split()
if '?' in path:
path_info, query_string = path.split('?', 1)
else:
path_info, query_string = path, ''
headers = {}
for line in lines[1:]:
if ': ' in line:
key, value = line.split(': ', 1)
headers[key] = value
environ = {
'REQUEST_METHOD': method,
'SCRIPT_NAME': '',
'PATH_INFO': path_info,
'QUERY_STRING': query_string,
'SERVER_NAME': self.host,
'SERVER_PORT': str(self.port),
'SERVER_PROTOCOL': protocol,
'wsgi.version': (1, 0),
'wsgi.url_scheme': 'http',
'wsgi.input': BytesIO(body_data),
'wsgi.errors': sys.stderr,
'wsgi.multithread': False,
'wsgi.multiprocess': False,
'wsgi.run_once': False,
}
for key, value in headers.items():
key_upper = key.upper().replace('-', '_')
if key_upper == 'CONTENT_TYPE':
environ['CONTENT_TYPE'] = value
elif key_upper == 'CONTENT_LENGTH':
environ['CONTENT_LENGTH'] = value
else:
environ[f'HTTP_{key_upper}'] = value
return environ
def call_app(self, environ):
status = None
response_headers = []
def start_response(response_status, headers, exc_info=None):
nonlocal status, response_headers
if exc_info:
try:
if status:
raise exc_info[1].with_traceback(exc_info[2])
finally:
exc_info = None
status = response_status
response_headers = headers
return lambda s: None
result = self.app(environ, start_response)
try:
body = b''.join(result)
finally:
if hasattr(result, 'close'):
result.close()
response = f'HTTP/1.1 {status}\r\n'.encode()
for name, value in response_headers:
response += f'{name}: {value}\r\n'.encode()
response += b'\r\n' + body
return response
def make_server(host, port, app):
server = SimpleWSGIServer(host, port)
server.set_app(app)
return server動作確認
自作サーバーで WSGI アプリケーションを動かしてみる。
def application(environ, start_response):
path = environ['PATH_INFO']
method = environ['REQUEST_METHOD']
body = f'Method: {method}\nPath: {path}\n'.encode('utf-8')
start_response('200 OK', [
('Content-Type', 'text/plain; charset=utf-8'),
('Content-Length', str(len(body))),
])
return [body]
if __name__ == '__main__':
server = make_server('localhost', 8000, application)
server.serve_forever()このサーバーを起動し、ブラウザや curl でアクセスすると、リクエスト情報が表示される。
$ curl http://localhost:8000/hello?name=world
Method: GET
Path: /hello制限事項と改善点
この実装は教育目的のものであり、本番環境では使えない。主な制限事項を挙げる。
シングルスレッドで動作するため、同時に 1 つのリクエストしか処理できない。マルチスレッド化やイベント駆動への拡張が必要。
チャンク転送エンコーディング、Keep-Alive、大きなリクエストボディなどに対応していない。
改善するには、以下のような機能を追加する必要がある。
マルチスレッド / マルチプロセス対応
完全な HTTP/1.1 パーサー
Keep-Alive 対応
エラーハンドリングの強化
標準ライブラリの wsgiref.simple_server は、これらの一部に対応した参照実装だ。自作サーバーと比較することで、実装の違いを学べる。
# 標準ライブラリの WSGI サーバー
from wsgiref.simple_server import make_server
server = make_server('localhost', 8000, application)
server.serve_forever()WSGI サーバーを自作する経験は、HTTP プロトコルと WSGI 仕様の両方を深く理解する機会となる。本番環境では成熟したサーバーを使うべきだが、その動作原理を知っていることは、問題解決やパフォーマンス改善の際に大きな助けとなる。











