並列処理(プロセスの開始とプロセス間通信まで)
Getting Started With Erlangの続き。いよいよ並列処理(3 Concurrent Programming)です!
プロセス
Erlangのスレッドはデータを共有しないため、プロセスと呼ばれます。プロセスは組み込み関数の「spawn(Module, Exported_Function, List of Arguments). 」で開始します。
- 引数でプロセスが実行するモジュール、関数名および引数を指定します。
- 実行する関数はモジュールでexportされているものに限られます。
- 戻り値でプロセスIDが返されます。
プロセスのサンプルです。3匹のねこが並列に鳴きます。
-module(process). -export([start/0, call/1]). % mii, tora, shiroが並列に鳴く。 start() -> spawn( process, call, [mii] ), % モジュール、関数名, 引数の順に指定 spawn( process, call, [tora] ), spawn( process, call, [shiro] ). % ねこを呼ぶ call( K ) -> call(K, 0). call( K, 3 ) -> ok; call( K, I ) -> io:format( "~w : meow!~n", [K] ), call( K, I+1 ).
実行結果です。途中で表示されている「<0.51.0>」がstart()の戻り値(=spawn( process, call, [shiro] )の戻り値)でプロセスIDになります。
5> process:start(). mii : meow! tora : meow! shiro : meow! <0.51.0>mii : meow! tora : meow! shiro : meow! mii : meow! tora : meow! shiro : meow!
プロセス間通信
プロセス間通信は次の3つを使って行います。
- receive構文
- メッセージ送信演算子(!)
- メッセージキュー
receive構文
receive構文はプロセスを「メッセージ待ち受け状態」にする構文です。以下の形式で書きます。
receive <パターン1> -> <処理>; <パターン2> -> <処理>; <パターン3> -> <処理>; <パターン4> -> <処理> end.
プロセスはreceive構文にさしかかると、
- 自身の「メッセージキュー」からメッセージを取り出し、パターンとマッチするか評価します。
- マッチするパターンがあった場合、メッセージがキューから削除されそれに続く処理が実行されます。
- マッチするパターンが存在しない場合、メッセージはそのままでキューにある次のメッセージが評価されます。
- メッセージがキューにない場合やすべてのメッセージがパターンとマッチしなかった場合、プロセスはブロックされ、メッセージ待ち状態になります。
メッセージ送信演算子(!)
メッセージ送信演算子(!)でプロセスにメッセージを送信します。使い方は次の通り。
<プロセスID> ! <メッセージ>
プロセスIDはspawn()関数から返されるものを指定します。メッセージには任意のアトムやタプルやリストが使えます。送信されたメッセージはプロセスのメッセージキューに格納され、このときプロセスがreceiveで待ち状態になっていれば処理が再開されることになります。
メッセージキュー
Erlangのすべてのプロセスはメッセージキューを持ちます。メッセージ送信演算子(!)でプロセスに送付されたメッセージはキューに格納されます。receiveでブロックしたプロセスはメッセージキューを監視して、クライアントからの要求を待ちます。
通信してみる
以下はプロセスを使った簡単なサーバーです。
- 起動すると、クライアントからのメッセージを待つ状態になります。
- 適切なメッセージが送付されると、一定のアクションを実行し、再度待ち状態になります。
- sleepのアトムがメッセージとして送付されると、プロセスを終了します。
-module(kitten). -export([start/1, run/1]). % ねこサーバー。 % メッセージに応じて一定のアクションを行う。 start(Name) -> spawn( kitten, run, [Name] ). run( Name ) -> % receiveでメッセージを待つ。 % メッセージが送付され、条件にマッチすると、それに続く処理が実行される。 receive run -> io:format( "~w : run!~n", [Name] ), run(Name); % 処理完了後再度待ち状態に遷移する。 jump -> io:format( "~w : jump!~n", [Name] ), run(Name); stop -> io:format( "~w : stop~n", [Name] ), run(Name); sleep -> ok; % sleepで終了 end.
実行してみます。まずはサーバーを起動。
3> P = kitten:start(mii). <0.40.0>
接続してみます。
4> P ! run. mii : run! run 5> P ! jump. mii : jump! jump 6> P ! stop. mii : stop stop 7> P ! sleep. sleep
おー。