Stackless Python入門
Erlang,Scala,Goなどの並行プログラミング言語が注目を浴びているが,並行プログラミング言語の一つにStackless Pythonというものが存在する.Pythonで並行プログラミングが可能というのはとても魅力的なので,少し遊んでみた.
元ネタは
http://www.grant-olson.net/python/intro-to-stackless-python
http://www.disinterest.org/resource/stackless/2.6-docs-html/
あたり.
インストール
Stackless PythonはCPythonに拡張を施した,Pythonの変種・方言であり,それ自体がPythonインタプリタの配布系なので,既存のCPythonにインストールするといったことは出来ない.そのため,Stackless Pythonを使うには,CPythonとは別の,Stackless Python環境を一からインストールする必要がある.
インストールする環境は,Mac OS X Snow Leopard 10.6.4となる.
以下のようにダウンロードして,ディレクトリ移動.最新の安定バージョンは,Python 2.6.5か,Python 3.1.2のどちらかとなる(http://www.stackless.com/svn).ここでは3.1.2を選択する.
$ svn checkout http://svn.python.org/projects/stackless/tags/python-3.12/ $ cd python-3.12
コンパイルとインストール
$ ./configure --enable-stacklessfewerregisters --enable-ipv6 --enable-framework $ make $ sudo make altinstall
Macの場合はconfigureに--enable-stacklessfewerregistersを渡してやらないと,コンパイル時にエラーが起きてしまうので注意が必要である.
起動してみると,Stacklessになっているのがわかる.
$ python3.1 Python 3.1.2 Stackless 3.1b3 060516 (python-3.12:83238M, Jul 30 2010, 07:31:14) [GCC 4.2.1 (Apple Inc. build 5659)] on darwin Type "help", "copyright", "credits" or "license" for more information. >>>
Macの場合,readlineがリンクされないためemacs風の移動ができないが,これはrlwrapを使えば出来るようになる.rlwrapはportからインストール可能である.
$ port install rlwrap $ rlwrap python3.1
rlwrapからpython3.1を起動してやれば,ctrl+aで行頭移動などが可能になる.
Tasklets
taskletはStackless Pythonでマイクロスレッドを構築するために用いられる.次のプログラムは,taskletを使って3つの関数呼び出しを行った例となる.
$ python3.1 >>> import stackless >>> >>> def print_x(x): ... print(x) ... >>> >>> stackless.tasklet(print_x)('one') # スケジューラのキューにタスクを追加 <stackless.tasklet object at 0x29c230> >>> stackless.tasklet(print_x)('two') <stackless.tasklet object at 0x29c6f0> >>> stackless.tasklet(print_x)('three') <stackless.tasklet object at 0x29c830> >>> >>> stackless.run() # キューにあるタスクを実行 one two three >>>
stackless.tasklet(func)(arg)でコールバック関数と,コールバック関数に渡す引数を指定して,スケジューラに登録していく.stackless.run()を呼び出すと,キューにあるタスクが順に実行される.
Scheduler
基本的に,タスクの実行はスケジュラーのキューに登録された順に実行される.しかしながら,タスクを一時中断し,他のタスクに実行権限を渡したい場合があるが,そのようなときはstackless.schedule()を呼び出す.
$ python3.1 >>> import stackless >>> >>> def print_three_times(x): ... print('1:', x) ... stackless.schedule() # 自分をスケジューラのキューの一番最後に持っていく ... print('2:', x) ... stackless.schedule() ... print('3:', x) ... stackless.schedule() ... >>> stackless.tasklet(print_three_times)('first') # スケジューラのキューにタスクを追加 <stackless.tasklet object at 0x29c230> >>> stackless.tasklet(print_three_times)('second') <stackless.tasklet object at 0x29c6f0> >>> stackless.tasklet(print_three_times)('third') <stackless.tasklet object at 0x29c830> >>> >>> stackless.run() # キューにあるタスクを実行 1: first 1: second 1: third 2: first 2: second 2: third 3: first 3: second 3: third >>>
stackless.schedule()を呼びだすと,現在実行中のタスクは一時中断され,タスクはスケジューラのキューの最後に移動される.タスクの再開は,stackless.schedule()で中断した次の行から実行が行われる.
Channels
タスク間で通信を行う場合は,channelを用いる.このチャネルは,ちょうどUnixのpipeに相当する機能を持っている.chennelを用いた例は以下のようになる.
>>> import stackless >>> >>> channel= stackless.channel() # 通信チャネルを作成 >>> >>> def receiving_tasklet(): ... print('Receiving tasklet started') ... print(channel.receive()) # チャネルからデータを読み込み ... print('Receiving tasklet finished') ... >>> def sending_tasklet(): ... print('Sending tasklet started') ... channel.send('send from sending_tasklet') # チャネルにデータを送信 ... print('sending tasklet finished') ... >>> def another_tasklet(): ... print('Just another tasklet in the scheduler') ... >>> stackless.tasklet(receiving_tasklet)() # スケジューラのキューにタスクを追加 <stackless.tasklet object at 0x29c230> >>> stackless.tasklet(sending_tasklet)() <stackless.tasklet object at 0x29c6f0> >>> stackless.tasklet(another_tasklet)() <stackless.tasklet object at 0x29c870> >>> >>> stackless.run() # キューにあるタスクを実行 Receiving tasklet started Sending tasklet started send from sending_tasklet Receiving tasklet finished Just another tasklet in the scheduler sending tasklet finished
stackless.channel()で通信チャネルを作成し,send(),receive()でチャネルへデータを送受信する.
receive()を呼び出すと,データ読み込みのためにブロックするが,データが到着した場合は,直ちに実行が再開される.
send()を呼びだすと,チャネルにデータが送信され,send()を呼び出した側のタスクは再スケジュール,つまりスケジュールのキューの一番最後に移動される.
ここで注意したいのは,send()を呼び出したときに,チャネルを受信しているプロセスがなかったら,そこでブロックしたままになるということである.その例が,次のプログラムとなる.
>>> >>> stackless.tasklet(sending_tasklet)() # スケジューラのキューに送信用タスクのみ追加 <stackless.tasklet object at 0x29c6f0> >>> stackless.tasklet(another_tasklet)() <stackless.tasklet object at 0x29c870> >>> >>> stackless.run() # キューにあるタスクを実行 Sending tasklet started Just another tasklet in the scheduler >>> >>> stackless.tasklet(another_tasklet)() # スケジューラのキューにタスクを追加 <stackless.tasklet object at 0x29c230> >>> stackless.run() # キューにあるタスクを実行 Just another tasklet in the scheduler # 送信用タスクは実行されない >>> >>> stackless.tasklet(receiving_tasklet)() # 受信用タスクをキューに追加 <stackless.tasklet object at 0x29c870> >>> >>> stackless.run() # キューにあるタスクを実行 Receiving tasklet started send from sending_tasklet Receiving tasklet finished sending tasklet finished # 送信が成功したので送信用タスクの実行が再開
はじめに送信用のタスクのみ登録しているが,受信用タスクは登録していないため,タスクが実行されたときはsend()でブロックしたままとなっている.しかしながら,後で受信用タスクを追加して,再びタスクを実行すると,send()が完了するため,送信用タスクは無事に実行が完了する.
Serialization
Stackless Pythonでは実行途中のタスクをシリアライズして,ファイルなどに保存することも可能である.やろうと思えば,途中で計算を終了させ,続きは他のマシンで計算させたりとかも可能である.
シリアライズは,Python標準のpickleを用いて行う.シリアライズを行った例は以下の通りとなる.
>>> import stackless, pickle >>> >>> def func(): ... busy_count = 0 ... while 1: ... busy_count += 1 ... if busy_count % 10 == 0: ... print(busy_count) ... >>> stackless.tasklet(func)() <stackless.tasklet object at 0x29c6f0> >>> >>> t1 = stackless.run(200) # タスクを実行. 10 # ただし,200インストラクション実行したところで停止させる 20 30 40 >>> s = pickle.dumps(t1) # 実行途中の状態をシリアライズ >>> >>> t1.kill() # 全タスクを終了 >>> >>> t2 = pickle.loads(s) # アンシリアライズ >>> t2.insert() # アンシリアライズしたタスクをスケジューラのキューに追加 <stackless.tasklet object at 0x461170> >>> t1 = stackless.run(200) # タスクを実行 50 60 70 80 90 100
まず,pickle.dumps()でタスクの状態をシリアライズし,データに保存する.シリアライズすれば,ファイルへの保存や,ネットワーク経由の転送も可能となる.なお,シリアライズした後は,kill()で現在のキューにあるタスクを全て停止させている.
次に,pickle.loads()でシリアライズされたタスクを,元に戻している.その後,復元したタスクを,insert()を呼び出して再びスケジューラのキューに追加し,stackless.run(200)と実行してみると,中断された箇所からタスクの実行が再開される.
Threading
追記:PythonにはGlobal Interpreter Lockというジャイアントロック機構があるので,どんなにスレッドを立ち上げても,マルチコアでの性能は向上しないそうです.なのでこの記事は間違いです
ここまできたら分かるとおり,Stackless Pythonはラウンドロビンでタスクを順に実行させているのみなので,そのままではマルチコアCPUなどのパフォーマンスをフルに引き出すことは出来ない.マルチコアCPUのパフォーマンスをフルに引き出すためには,スレッドと併用する必要がある.
Stackless Pythonでは,チャネルを用いてプロセス間通信を行っていたが,このチャネルはスレッドセーフとなっているため,異なるスレッド間での通信も,チャネルを用いて行うことが出来る.次のプログラムはマルチスレッドでチャネルを用いたプロセス間通信を行っている例となる.
>>> import threading, stackless >>> >>> commandChannel = stackless.channel() # チャネルを作成 >>> >>> def master_func(): ... print('MASTER STARTING') ... commandChannel.send('ECHO 1') ... commandChannel.send('ECHO 2') ... commandChannel.send('ECHO 3') ... commandChannel.send('QUIT') ... print('MASTER ENDING') ... >>> def slave_func(): ... print('SLAVE STARTING') ... while 1: ... commandChannel = stackless.channel() ... >>> def slave_func(): ... print('SLAVE STARTING') ... while 1: ... command = commandChannel.receive() ... print('SLAVE:', command) ... if command == 'QUIT': ... break ... print('SLAVE ENDING') ... >>> def scheduler_run(tasklet_func): ... t = stackless.tasklet(tasklet_func)() ... while t.alive: ... stackless.run() ... >>> thread = threading.Thread(target=scheduler_run, args=(master_func,)) >>> thread.start() # 送信用タスクを別スレッドで実行 MASTER STARTING >>> >>> scheduler_run(slave_func) SLAVE STARTING SLAVE: ECHO 1 SLAVE: ECHO 2 SLAVE: ECHO 3 SLAVE: QUIT SLAVE ENDING MASTER ENDING
threading.Thread()でスレッドを生成し,start()でスレッドを実行している.ここでは,メッセージ送信用のためのmaster_func()をコールバックとして設定している.別スレッドを立ち上げ,send()を呼びだすと,どこかでreceive()により受信されるまでブロックしたままとなる.
scheduler_run()は,これまで同様に受信用タスクのslave_func()を起動しているだけである.slave_func()では,receive()でチャネルからメッセージを取得している.
このように,マルチスレッドになっても基本的なことは殆ど変わっていない事がわかる.
Actor Model
アクターモデルの並行プログラミングでは,軽量プロセスとプロセス間通信のパイプが重要な役割を果たす.スレッドモデルでは,データ・ステートなどの情報は共有メモリ上に置かれ,それらを安全に変更するためにはロックが必要であり,これがマルチスレッドプログラミングを難しくさせる原因となっていた.
しかしながら,ErlangやScala,Go,Stackless Pythonを初めとするアクターモデルに基づく並行プログラミング言語では,共有メモリを用いたデータ・ステートの変更は出来る限り行わないようにしている.その結果,従来のスレッドモデルと比較して,非常に簡単に並行プログラミングが行えるようになっている.
アクターモデルで並行プログラミングを行う際に設計すべきなのは,通信プロトコルと,各プロセスのステート管理についてだけである.ロックは必要ない.