読者です 読者をやめる 読者になる 読者になる
無料で使えるシステムトレードフレームワーク「Jiji」 をリリースしました!

・OANDA Trade APIを利用した、オープンソースのシステムトレードフレームワークです。
・自分だけの取引アルゴリズムで、誰でも、いますぐ、かんたんに、自動取引を開始できます。

同時実行抑制インターセプタ

Needleで使える同時実行抑制インターセプタを書きました。Mutex#synchronizedで同じIDを持つインターセプタが設定されたメソッド間の同時実行を抑制します。

require 'thread'
require 'set'

# 同時実行抑制インターセプタ
class SynchronizeInterceptor

  @@pool = {}
  @@pool_mutex = Mutex.new

  # コンストラクタ
  def initialize( point, options )
    @id = options[:id] || :default
    @mutex = mutex( @id )
  end

  def process( chain, context )
    # 2重ロックの回避
    set = Thread.current[:synchronize_interceptor_locked] ||= Set.new
    if set.include? @id
      chain.process_next( context )
    else
      set.add( @id )
      begin
        @mutex.synchronize {
          chain.process_next( context )
        }
      ensure
        set.delete( @id )
      end
    end
  end

  # IDに対応するmutexを取得する
  def mutex( id )
    @@pool_mutex.synchronize {
       @@pool[id] ||= Mutex.new
    }
  end

end

サンプルは以下。同じIDを持つ[:loop_x1, :loop_x2]、[:loop_y1, :loop_y2]の組み合わせの場合、コンポーネントメソッドの同時呼び出しが抑制され、「0,1,2,3,4,0,1,2,3,4,」と表示されます。(抑制されない場合は「0,0,1,1,2,2,3,3,4,4,」となります。)

# レジストリ
p = proc {
  5.times {|i| print "#{i},"; sleep 0.1 }
}
registry = Needle::Registry.define {|builder|
  builder.loop_no { p } # 同時実行制御をしないコンポーネント
  builder.loop_x1 { p } # id="x"で同時実行インターセプタを設定
  builder.loop_x2 { p } # id="x"で同時実行インターセプタを設定その2
  builder.loop_y1 { p } # id="y"で同時実行インターセプタを設定
  builder.loop_y2 { p } # id="y"で同時実行インターセプタを設定その2
}

# インターセプタを適用
registry.intercept( :loop_x1 ).with { SynchronizeInterceptor }.with_options( :id=>"x" )
registry.intercept( :loop_x2 ).with { SynchronizeInterceptor }.with_options( :id=>"x" )
registry.intercept( :loop_y1 ).with { SynchronizeInterceptor }.with_options( :id=>"y" )
registry.intercept( :loop_y2 ).with { SynchronizeInterceptor }.with_options( :id=>"y" )

# 同じIDを持つ[:loop_x1, :loop_x2]、[:loop_y1, :loop_y2]の組み合わせの場合、同時呼び出しが抑制される。
loops = [
  :loop_no, :loop_x1, :loop_x2, :loop_y1, :loop_y2
]
loops.each {|a|
  loops.each {|b|
    puts "\n\n---#{a} x #{b}"
    [registry[a],registry[b]].map {|x|
      Thread.fork(x) {|l| l.call }
    }.each{|t| t.value }
  }
}

実行結果です。

---loop_no x loop_no
0,0,1,1,2,2,3,3,4,4,

---loop_no x loop_x1
0,0,1,1,2,2,3,3,4,4,

---loop_no x loop_x2
0,0,1,1,2,2,3,3,4,4,

---loop_no x loop_y1
0,0,1,1,2,2,3,3,4,4,

---loop_no x loop_y2
0,0,1,1,2,2,3,3,4,4,

---loop_x1 x loop_no
0,0,1,1,2,2,3,3,4,4,

---loop_x1 x loop_x1
0,1,2,3,4,0,1,2,3,4,

---loop_x1 x loop_x2
0,1,2,3,4,0,1,2,3,4,

---loop_x1 x loop_y1
0,0,1,1,2,2,3,3,4,4,

---loop_x1 x loop_y2
0,0,1,1,2,2,3,3,4,4,

---loop_x2 x loop_no
0,0,1,1,2,2,3,3,4,4,

---loop_x2 x loop_x1
0,1,2,3,4,0,1,2,3,4,

---loop_x2 x loop_x2
0,1,2,3,4,0,1,2,3,4,

---loop_x2 x loop_y1
0,0,1,1,2,2,3,3,4,4,

---loop_x2 x loop_y2
0,0,1,1,2,2,3,3,4,4,

---loop_y1 x loop_no
0,0,1,1,2,2,3,3,4,4,

---loop_y1 x loop_x1
0,0,1,1,2,2,3,3,4,4,

---loop_y1 x loop_x2
0,0,1,1,2,2,3,3,4,4,

---loop_y1 x loop_y1
0,1,2,3,4,0,1,2,3,4,

---loop_y1 x loop_y2
0,1,2,3,4,0,1,2,3,4,

---loop_y2 x loop_no
0,0,1,1,2,2,3,3,4,4,

---loop_y2 x loop_x1
0,0,1,1,2,2,3,3,4,4,

---loop_y2 x loop_x2
0,0,1,1,2,2,3,3,4,4,

---loop_y2 x loop_y1
0,1,2,3,4,0,1,2,3,4,

---loop_y2 x loop_y2
0,1,2,3,4,0,1,2,3,4,