スレッド間での処理の待ち合わせ
スレッド間で処理の完了を待ちあわせるには、ConditionVariableを利用します。ConditionVariableはJavaのウェイトセット(Object#wait,notify,notifyAllを使って行うアレ。)のような機能を提供するクラスで、これを利用してスレッド間での待ち合わせ機構(Guarded Suspentionパターンでいいのかな)が実現できます。
具体例
スレッドの待ち合わせが必要な例としてはワーカースレッドを作るときに使う「キュー」があります。
- 「キュー」はワーカースレッドが使う仕事が積まれる配列。
- 仕事を積むスレッドが定期的にキューに仕事を追加します。
- ワーカースレッドは ...
- キューに仕事があればそれを取り出して実行します。
- キューに仕事がなければ、キューに仕事が積まれるのを待ちます。仕事が積まれたら即座に処理を再開し、仕事を実行します。
ここで、「ワーカースレッド」は「仕事を積むスレッド」が仕事を積むのを「待ちあわせる」必要があります。
使い方
ConditionVariableはMutexと組み合わせて使います。使い方は以下のサンプルを参照。
require "thread" # ConditionVariableとMutex m = Mutex.new cv = ConditionVariable.new # キュー queue = [] # ワーカースレッドを10コ起動。 10.times {|i| Thread.new( "thread-" << i.to_s ){|name| # 一連の操作を同期化する。 # ワーカースレッドx10と仕事を積むスレッドでキューへのアクセスを同期化するため。 m.synchronize { # ワーカースレッドは、「キューが空でないか(ガード条件と呼ばれる)」をチェックし、条件を満たさない場合waitを実行する。 while ( queue.empty? ) # waitを呼び出したスレッドは、Mutexのロックを *解放* し、待ち状態に入る。 cv.wait(m) # ConditionVariable#broadcast or ConditionVariable#signal # が他のスレッドによって実行された場合、Mutexのロックを *再取得* してから # 処理が再開される。 # while で再度、ガード条件をチェックする。 # これは、処理が再開されたからと言ってガード条件が満たされているとは限らないからである。 # - キューが空なのにConditionVariable#broadcast or ConditionVariable#signalが実行された場合 # - ConditionVariable#broadcastで再開された別のワーカースレッドがキューを空にした場合 # などがあり得る。 end # ガード条件が満たされた場合ここに来る。 # キューから仕事を取り出して実行する。 # m.synchronize ブロックの中であるので、キューへのアクセスは安全。 queue.pop.call( name ) } } } 10.times {|i| sleep rand * 0.1 # 一連の操作を同期化する。 # ワーカースレッドx10と仕事を積むスレッドでキューへのアクセスを同期化するため。 m.synchronize { queue.push Proc.new {|name| # キューに仕事を積む。 puts name + " : " + i.to_s } # 待ち状態のスレッドを起こす。 cv.broadcast # ConditionVariable#signalでも良いが、ガード条件や待ちスレッドが複雑化した場合、 # cv.broadcastでとりあえず全部たたき起こしておいて、スレッドごとに個別に条件を再チェックする # ようにした方が実装が楽らしい。→ 「TECHSCORE - Java - マルチスレッドプログラミング - スレッド間の待ち合わせ」(http://www.techscore.com/tech/J2SE/Thread/5-2.html )より。 } } sleep 1 # ワーカースレッドが最後の仕事を終えるのを待つ。
実行結果です。
thread-0 : 0 thread-1 : 1 thread-2 : 2 thread-3 : 3 thread-4 : 4 thread-5 : 5 thread-9 : 6 thread-7 : 7 thread-6 : 8 thread-8 : 9