読者です 読者をやめる 読者になる 読者になる
無料で使えるシステムトレードフレームワーク「Jiji」 をリリースしました!

・OANDA Trade APIを利用した、オープンソースのシステムトレードフレームワークです。
・自分だけの取引アルゴリズムで、誰でも、いますぐ、かんたんに、自動取引を開始できます。

MongoDBのinsert/updateをまとめて、bulk insert/update に流すユーティリティを書いた

Ruby

バッチ処理などでMongoDBに大量のinsert/updateを行うとき、Mongoidを使って1つずつ #save してると遅い。 ということで、複数#save をまとめて bulk insert/update に流すユーティリティを書いてみました。

使い方

  • モデルクラスで、Mongoid::DocumentUtils::BulkWriteOperationSupportinclude する。
  • Utils::BulkWriteOperationSupport.begin_transaction を呼び出してから、モデルの #save を呼び出す。
    • この時点ではMongoDBへのinsert/updateは行われず、バッファに蓄積されます。
  • Utils::BulkWriteOperationSupport.end_transaction を呼び出すと、バッファのデータが #bulk_write でまとめて永続化される。
class TestModel
  
  # Mongoid::Document と Utils::BulkWriteOperationSupport をincludeする
  include Mongoid::Document
  include Utils::BulkWriteOperationSupport

  store_in collection: 'test_model'

  field :name,      type: String

end

#略

puts TestModel.count # => 0

Utils::BulkWriteOperationSupport.begin_transaction

# #begin_transaction を呼び出したあと、モデルを作成/変更して、 #save を呼び出す。
a = TestModel.new
a.name = 'a'
b = TestModel.new
b.name = 'b'
a.save
b.save

# #end_transaction を実行するまで、永続化されない
puts TestModel.count # => 0

# #end_transaction を呼び出すと、バッファのデータが #bulk_write でまとめて永続化される。
Utils::BulkWriteOperationSupport.end_transaction

puts TestModel.count # => 2

ユーティリティのコード

  • Document#save を書き換えて、#begin_transaction#end_transaction の間であれば、スレッドローカルに永続化対象としてマーク。
  • #end_transaction が呼び出されたタイミンクで、まとめて #bulk_write で永続化します。
  • 参照整合性のチェックとか、いろいろ手抜きなので必要に応じて改造してください。
module BulkWriteOperationSupport

  KEY = BulkWriteOperationSupport.name

  def save
    if BulkWriteOperationSupport.in_transaction?
      BulkWriteOperationSupport.transaction << self
    else
      super
    end
  end

  def self.in_transaction?
    !transaction.nil?
  end

  def self.begin_transaction
    Thread.current[KEY] = Transaction.new
  end

  def self.end_transaction
    return unless in_transaction?
    transaction.execute
    Thread.current[KEY] = nil
  end

  def self.transaction
    Thread.current[KEY]
  end

  def create_insert_operation
    { :insert_one => as_document }
  end

  def create_update_operation
    {
      :update_one => {
        :filter => { :_id => id },
        :update => {'$set' => collect_changed_values }
      }
    }
  end

  private

  def collect_changed_values
    changes.each_with_object({}) do |change, r|
      r[change[0].to_sym] = change[1][1]
    end
  end

  class Transaction

    def initialize
      @targets = {}
    end

    def <<(model)
      targets_of( model.class )[model.object_id] = model
    end

    def execute
      until @targets.empty?
        model_class = @targets.keys.first
        execute_bulk_write_operations(model_class)
      end
    end

    def size
      @targets.values.reduce(0) {|a, e| a + e.length }
    end

    private

    def targets_of( model_class )
      @targets[model_class] ||= {}
    end

    def execute_bulk_write_operations(model_class)
      return unless @targets.include?(model_class)
      execute_parent_object_bulk_write_operations_if_exists(model_class)

      client = model_class.mongo_client[model_class.collection_name]
      operations = create_operations(@targets[model_class].values)
      client.bulk_write(operations) unless operations.empty?

      @targets.delete model_class
    end

    def execute_parent_object_bulk_write_operations_if_exists(model_class)
      parents = model_class.reflect_on_all_associations(:belongs_to)
      parents.each do |m|
        klass = m.klass
        execute_bulk_write_operations(klass)
      end
    end

    def create_operations(targets)
      targets.each_with_object([]) do |model, array|
        if model.new_record?
          model.new_record = false
          array << model.create_insert_operation
        else
          array << model.create_update_operation if model.changed?
        end
      end
    end

  end

end