無料で使えるシステムトレードフレームワーク「Jiji」 をリリースしました!

・OANDA Trade APIを利用した、オープンソースのシステムトレードフレームワークです。
・自分だけの取引アルゴリズムで、誰でも、いますぐ、かんたんに、自動取引を開始できます。

並列処理(プロセスの開始とプロセス間通信まで)

Getting Started With Erlangの続き。いよいよ並列処理(3 Concurrent Programming)です!

プロセス

Erlangのスレッドはデータを共有しないため、プロセスと呼ばれます。プロセスは組み込み関数の「spawn(Module, Exported_Function, List of Arguments). 」で開始します。

  • 引数でプロセスが実行するモジュール、関数名および引数を指定します。
  • 実行する関数はモジュールでexportされているものに限られます。
  • 戻り値でプロセスIDが返されます。

プロセスのサンプルです。3匹のねこが並列に鳴きます。

-module(process).
-export([start/0, call/1]).

% mii, tora, shiroが並列に鳴く。
start() ->
  spawn( process, call, [mii] ),  % モジュール、関数名, 引数の順に指定
  spawn( process, call, [tora] ),
  spawn( process, call, [shiro] ).

% ねこを呼ぶ
call( K ) -> call(K, 0).

call( K, 3 ) -> ok;
call( K, I  ) ->
  io:format( "~w : meow!~n", [K] ),
  call( K, I+1 ).

実行結果です。途中で表示されている「<0.51.0>」がstart()の戻り値(=spawn( process, call, [shiro] )の戻り値)でプロセスIDになります。

5> process:start().
mii : meow!
tora : meow!
shiro : meow!
<0.51.0>mii : meow!
tora : meow!
shiro : meow!

mii : meow!
tora : meow!
shiro : meow!

プロセス間通信

プロセス間通信は次の3つを使って行います。

  • receive構文
  • メッセージ送信演算子(!)
  • メッセージキュー
receive構文

receive構文はプロセスを「メッセージ待ち受け状態」にする構文です。以下の形式で書きます。

receive
  <パターン1> -> <処理>;
  <パターン2> -> <処理>;
  <パターン3> -> <処理>;
  <パターン4> -> <処理>
end.

プロセスはreceive構文にさしかかると、

  1. 自身の「メッセージキュー」からメッセージを取り出し、パターンとマッチするか評価します。
    • マッチするパターンがあった場合、メッセージがキューから削除されそれに続く処理が実行されます。
    • マッチするパターンが存在しない場合、メッセージはそのままでキューにある次のメッセージが評価されます。
    • メッセージがキューにない場合やすべてのメッセージがパターンとマッチしなかった場合、プロセスはブロックされ、メッセージ待ち状態になります。
メッセージ送信演算子(!)

メッセージ送信演算子(!)でプロセスにメッセージを送信します。使い方は次の通り。

<プロセスID> ! <メッセージ>

プロセスIDはspawn()関数から返されるものを指定します。メッセージには任意のアトムやタプルやリストが使えます。送信されたメッセージはプロセスのメッセージキューに格納され、このときプロセスがreceiveで待ち状態になっていれば処理が再開されることになります。

メッセージキュー

Erlangのすべてのプロセスはメッセージキューを持ちます。メッセージ送信演算子(!)でプロセスに送付されたメッセージはキューに格納されます。receiveでブロックしたプロセスはメッセージキューを監視して、クライアントからの要求を待ちます。

通信してみる

以下はプロセスを使った簡単なサーバーです。

  • 起動すると、クライアントからのメッセージを待つ状態になります。
  • 適切なメッセージが送付されると、一定のアクションを実行し、再度待ち状態になります。
  • sleepのアトムがメッセージとして送付されると、プロセスを終了します。
-module(kitten).
-export([start/1, run/1]).

% ねこサーバー。
% メッセージに応じて一定のアクションを行う。
start(Name) ->
  spawn( kitten, run, [Name] ).

run( Name ) ->
  % receiveでメッセージを待つ。
  % メッセージが送付され、条件にマッチすると、それに続く処理が実行される。
  receive
    run   -> io:format( "~w : run!~n", [Name] ),  run(Name); % 処理完了後再度待ち状態に遷移する。
    jump  -> io:format( "~w : jump!~n", [Name] ), run(Name);
    stop  -> io:format( "~w : stop~n", [Name] ),  run(Name);
    sleep -> ok;                                             % sleepで終了
  end.

実行してみます。まずはサーバーを起動。

3> P = kitten:start(mii).
<0.40.0>

接続してみます。

4> P ! run.
mii : run!
run
5> P ! jump.
mii : jump!
jump
6> P ! stop.
mii : stop
stop
7> P ! sleep.
sleep

おー。