11.1 並行処理
プログラムを高速化したり、ネットワークの遅延に対応したりするために、同時に複数のことを行う「並行処理」が必要になります。 並行処理を考える上で、待ち時間の原因は主に2つに分けられます。
- I/Oバウンド: ディスクの読み書きやネットワーク通信などの入力/出力処理を待っている状態。
- CPUバウンド: 科学計算や画像処理など、CPUの計算そのものを待っている状態。
11.1.1 - 11.1.2 キューとプロセス (multiprocessing)
Section titled “11.1.1 - 11.1.2 キューとプロセス (multiprocessing)”複数のタスクを並行して効率よく処理する代表的な方法が「キュー(Queue)」です。皿洗いを例にすると、1人が洗って(パブリッシャ)、もう1人が拭く(サブスクライバ/ワーカー)作業を、独立したプロセス間で分担します。
multiprocessing モジュールと JoinableQueue を使った皿洗いの例です。
import multiprocessing as mp
# 洗浄担当(キューに皿を入れる)def washer(dishes, output): for dish in dishes: print('Washing', dish, 'dish') output.put(dish)
# 乾燥担当(キューから皿を取り出す)def dryer(input): while True: dish = input.get() print('Drying', dish, 'dish') input.task_done()
if __name__ == '__main__': dish_queue = mp.JoinableQueue()
# 乾燥担当を別プロセスとして起動(デーモン化してメイン終了時に終わるようにする) dryer_proc = mp.Process(target=dryer, args=(dish_queue,)) dryer_proc.daemon = True dryer_proc.start()
dishes = ['salad', 'bread', 'entree', 'dessert'] washer(dishes, dish_queue)
# キューのタスクがすべて終わるまで待機 dish_queue.join()11.1.3 スレッド (threading)
Section titled “11.1.3 スレッド (threading)”スレッドは、同じプロセス内で実行され、プロセス内のすべてのメモリ(グローバル変数など)を共有します。
import threading
def do_this(what): print("Thread %s says: %s" % (threading.current_thread(), what))
if __name__ == "__main__": for n in range(4): p = threading.Thread(target=do_this, args=("I'm function %s" % n,)) p.start()11.1.4 グリーンスレッドと gevent
Section titled “11.1.4 グリーンスレッドと gevent”gevent は、イベント駆動型の並行処理ライブラリです。通常の同期的なコードを書くだけで、背後で「グリーンレット(グリーンスレッド)」というブロックしないスレッドに変換して高速化してくれます。
# pip install gevent が必要import geventfrom gevent import socket
hosts = ['[www.crappytaxidermy.com](https://www.crappytaxidermy.com)', '[www.walterpottertaxidermy.com](https://www.walterpottertaxidermy.com)', '[www.antique-taxidermy.com](https://www.antique-taxidermy.com)']
# 非同期に名前解決を実行jobs = [gevent.spawn(socket.gethostbyname, host) for host in hosts]gevent.joinall(jobs, timeout=5)
for job in jobs: print(job.value)モンキーパッチング: 標準ライブラリの socket などを一括で gevent 版に置き換える強力な手法もあります。
from gevent import monkeymonkey.patch_all() # プログラムの冒頭で呼び出す11.1.5 & 11.1.6 非同期フレームワーク (twisted, asyncio)
Section titled “11.1.5 & 11.1.6 非同期フレームワーク (twisted, asyncio)”twisted: コールバックを使った非同期のイベント駆動型ネットワーキングフレームワークです。asyncio: Python 3.4で標準ライブラリとして導入された非同期I/Oモジュール(開発コードネーム: Tulip)です。共通のイベントループを提供します。
11.1.7 Redis を使った複数マシンでのキュー
Section titled “11.1.7 Redis を使った複数マシンでのキュー”プロセスやスレッドは1台のマシン上で動作しますが、Redis を使うとネットワーク越しの複数マシン間にまたがるキューを簡単に作成できます。
洗浄担当 (パブリッシャ)
import redisconn = redis.Redis()
dishes = ['salad', 'bread', 'entree', 'dessert']for dish in dishes: # 右端にプッシュ (rpush) conn.rpush('dishes', dish.encode('utf-8'))
# 終了を知らせる番兵 (sentinel) を送るconn.rpush('dishes', 'quit')乾燥担当 (ワーカー)
import redisconn = redis.Redis()
while True: # 左端からブロック付きでポップ (blpop) msg = conn.blpop('dishes') if not msg: break
val = msg[1].decode('utf-8') if val == 'quit': break print('Dried', val)このアプローチにより、乾燥担当(ワーカー)のプロセスや別マシンを増やせば増やすほど、同時に処理できる量を簡単にスケールアップできます。さらに本格的なタスク管理が必要な場合は celery や rq といった専用ライブラリを使用します。