Skip to content

11.1 並行処理

プログラムを高速化したり、ネットワークの遅延に対応したりするために、同時に複数のことを行う「並行処理」が必要になります。 並行処理を考える上で、待ち時間の原因は主に2つに分けられます。

  • I/Oバウンド: ディスクの読み書きやネットワーク通信などの入力/出力処理を待っている状態。
  • CPUバウンド: 科学計算や画像処理など、CPUの計算そのものを待っている状態。

11.1.1 - 11.1.2 キューとプロセス (multiprocessing)

Section titled “11.1.1 - 11.1.2 キューとプロセス (multiprocessing)”

複数のタスクを並行して効率よく処理する代表的な方法が「キュー(Queue)」です。皿洗いを例にすると、1人が洗って(パブリッシャ)、もう1人が拭く(サブスクライバ/ワーカー)作業を、独立したプロセス間で分担します。

multiprocessing モジュールと JoinableQueue を使った皿洗いの例です。

import multiprocessing as mp
# 洗浄担当(キューに皿を入れる)
def washer(dishes, output):
for dish in dishes:
print('Washing', dish, 'dish')
output.put(dish)
# 乾燥担当(キューから皿を取り出す)
def dryer(input):
while True:
dish = input.get()
print('Drying', dish, 'dish')
input.task_done()
if __name__ == '__main__':
dish_queue = mp.JoinableQueue()
# 乾燥担当を別プロセスとして起動(デーモン化してメイン終了時に終わるようにする)
dryer_proc = mp.Process(target=dryer, args=(dish_queue,))
dryer_proc.daemon = True
dryer_proc.start()
dishes = ['salad', 'bread', 'entree', 'dessert']
washer(dishes, dish_queue)
# キューのタスクがすべて終わるまで待機
dish_queue.join()

スレッドは、同じプロセス内で実行され、プロセス内のすべてのメモリ(グローバル変数など)を共有します。

import threading
def do_this(what):
print("Thread %s says: %s" % (threading.current_thread(), what))
if __name__ == "__main__":
for n in range(4):
p = threading.Thread(target=do_this, args=("I'm function %s" % n,))
p.start()

gevent は、イベント駆動型の並行処理ライブラリです。通常の同期的なコードを書くだけで、背後で「グリーンレット(グリーンスレッド)」というブロックしないスレッドに変換して高速化してくれます。

# pip install gevent が必要
import gevent
from gevent import socket
hosts = ['[www.crappytaxidermy.com](https://www.crappytaxidermy.com)', '[www.walterpottertaxidermy.com](https://www.walterpottertaxidermy.com)', '[www.antique-taxidermy.com](https://www.antique-taxidermy.com)']
# 非同期に名前解決を実行
jobs = [gevent.spawn(socket.gethostbyname, host) for host in hosts]
gevent.joinall(jobs, timeout=5)
for job in jobs:
print(job.value)

モンキーパッチング: 標準ライブラリの socket などを一括で gevent 版に置き換える強力な手法もあります。

from gevent import monkey
monkey.patch_all() # プログラムの冒頭で呼び出す

11.1.5 & 11.1.6 非同期フレームワーク (twisted, asyncio)

Section titled “11.1.5 & 11.1.6 非同期フレームワーク (twisted, asyncio)”
  • twisted: コールバックを使った非同期のイベント駆動型ネットワーキングフレームワークです。
  • asyncio: Python 3.4で標準ライブラリとして導入された非同期I/Oモジュール(開発コードネーム: Tulip)です。共通のイベントループを提供します。

11.1.7 Redis を使った複数マシンでのキュー

Section titled “11.1.7 Redis を使った複数マシンでのキュー”

プロセスやスレッドは1台のマシン上で動作しますが、Redis を使うとネットワーク越しの複数マシン間にまたがるキューを簡単に作成できます。

洗浄担当 (パブリッシャ)

redis_washer.py
import redis
conn = redis.Redis()
dishes = ['salad', 'bread', 'entree', 'dessert']
for dish in dishes:
# 右端にプッシュ (rpush)
conn.rpush('dishes', dish.encode('utf-8'))
# 終了を知らせる番兵 (sentinel) を送る
conn.rpush('dishes', 'quit')

乾燥担当 (ワーカー)

redis_dryer.py
import redis
conn = redis.Redis()
while True:
# 左端からブロック付きでポップ (blpop)
msg = conn.blpop('dishes')
if not msg:
break
val = msg[1].decode('utf-8')
if val == 'quit':
break
print('Dried', val)

このアプローチにより、乾燥担当(ワーカー)のプロセスや別マシンを増やせば増やすほど、同時に処理できる量を簡単にスケールアップできます。さらに本格的なタスク管理が必要な場合は celeryrq といった専用ライブラリを使用します。