無料で使えるシステムトレードフレームワーク「Jiji」 をリリースしました!

・OANDA Trade APIを利用した、オープンソースのシステムトレードフレームワークです。
・自分だけの取引アルゴリズムで、誰でも、いますぐ、かんたんに、自動取引を開始できます。

next()でのタイムアウトをサポートするイテレータ

next()でのタイムアウトをサポートするイテレータを作ってみました。

  • 通常のjava.util.Iteratorのラッパーとして使え、
  • 追加のAPIである「next (long timeout, TimeUnit unit)」を使うことで、タイムアウト指定ありの次要素取得が可能です。
    • タイムアウト有りの場合、一定時間経過しても移譲先イテレータのnext()から要素が取得できなければ、TimeoutExceptionをスローします。
    • 一定時間内に取得できれば、即座に要素を返します。

専用の先読みスレッドを使ってイテレータを読みつつ、取得できた分だけ順に返す/一定時間しても取得できなければタイムアウト、という仕組み。

注意事項

  • スレッドセーフではありません。
    • 読み出し(next())を複数スレッドから行った場合、待ち状態のまま停止する場合があります。
  • Iterator#remove() は未サポートです。

利用例

// リスト
List<String> test = Arrays.asList( new String[] {
    "aaa", "bbb", "ccc", "ddd"
} );

// 作成。引数で移譲先のイテレータを渡す。
TimeoutSupportIterator<String> it =
    new TimeoutSupportIterator<String>( test.iterator() );

while ( it.hasNext() ) {
    try {
        // タイムアウト有りで次の要素を取得
        String item = it.next( 1000, TimeUnit.MILLISECONDS );
    } catch ( TimeoutException e ) {
        // 1000ミリ秒待っても次の要素が取得できない場合、ここにくる。
    }
}

実装

import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
 * データ取得のタイムアウトをサポートするイテレータ。
 * {@link Iterator#next()}で一定時間待っても要素が取得できなければ、タイムアウトします。
 *
 * @param <T> 要素の型
 *
 * @version $Revision:$
 * @author  $Author:$
 */
public class TimeoutSupportIterator<T>
implements Iterator<T> {

    /**読み込んだ要素を格納するキュー*/
    private final BlockingQueue<Object> queue;

    /**読み込み処理*/
    private final Prefetch<T> prefetch;
    /**引換券*/
    private final Future<Void> future;

    /**
     * コンストラクタ
     * @param iterator イテレータ
     */
    public  TimeoutSupportIterator( Iterator<T> iterator ) {
        this.queue = new LinkedBlockingQueue<Object>();
        this.prefetch = new Prefetch<T>( iterator, this.queue );
        ExecutorService service = Executors.newSingleThreadExecutor();
        future = service.submit( this.prefetch );
    }

    /**
     * 次の要素を取得します。
     * 一定時間経過後も要素が取得できない場合、タイムアウトします。
     *
     * @param timeout
     * @param unit
     * @return 次の要素
     * @throws TimeoutException 一定時間経過後も要素が取得できない場合
     * @throws InterruptedException スレッドで割り込みが発生した場合
     */
    public T next (long timeout, TimeUnit unit)
    throws TimeoutException, InterruptedException {
        if ( !hasNext() ) {
            throw new NoSuchElementException();
        }
        Object item = queue.poll( timeout, unit );
        if ( item == null ) {
            throw new TimeoutException();
        } else {
            return handleItem( item );
        }
    }

    /**
     * @see java.util.Iterator#next()
     */
    public T next () {
        Object item = null;
        while ( item == null ) {
            try {
                if ( !hasNext() ) {
                    throw new NoSuchElementException();
                }
                item = queue.take();
            } catch ( InterruptedException e ) {
                // 割り込みは無視。
            }
        }
        return handleItem( item );
    }

    /**
     * @see java.util.Iterator#hasNext()
     */
    public boolean hasNext () {
        return prefetch.hasNext();
    }

    /**
     * @see java.util.Iterator#remove()
     */
    public void remove () {
        throw new UnsupportedOperationException( "not implements." );
    }

    /**
     * イテレータの読み込みを停止します。
     * これを呼び出した後は、{@link #next()}または{@link #next(long, TimeUnit)}を実行してはいけません。
     * @return 読み込みが完了する前に取り消された場合は true
     */
    public boolean close () {
        return future.cancel( true );
    }

    /**
     * キューの要素を処理する。
     * @param item キューの要素
     * @return 結果
     */
    private final T handleItem( Object item ) {
        if ( item == NULL ) {
            return null;
        } else if ( item instanceof ExceptionHolder ) {
            throw ((ExceptionHolder) item).e;
        } else {
            return (T) item;
        }
    }

    /**
     * 先読み処理
     *
     * @version $Revision:$
     * @author  $Author:$
     */
    private static final class Prefetch<T> implements Callable<Void> {

        private final Iterator<T> iterator;
        private final BlockingQueue<Object> queue;
        private final Object monitor = new Object();

        private Prefetch( Iterator<T> iterator, BlockingQueue<Object> queue ) {
            this.iterator = iterator;
            this.queue = queue;
        }

        private boolean hasNext() {
            // キューに何かあるか、イテレータに未読み込みの要素があれば次がある。
            synchronized( monitor ) {
                return !queue.isEmpty() || iterator.hasNext();
            }
        }

        public Void call ()  {
            while ( true ) {
                // キューをさわる間、hasNext()と処理を同期化
                synchronized( monitor ) {
                    if ( iterator.hasNext() ) {
                        Object item = null;
                        try {
                            item = iterator.next();
                        } catch ( RuntimeException e ) {
                            // 例外は専用のオブジェクトに積んで渡す
                            item = new ExceptionHolder(e);
                        }
                        if ( item == null ) {
                            item = NULL; // nullはキューに詰めないので差し替えておく。
                        }
                        try {
                            queue.put( item );
                        } catch ( InterruptedException e ) {
                            break;
                        }
                    } else {
                        break;
                    }
                }
                if ( Thread.interrupted() ) {
                    break;
                }
            }
            return null;
        }
    }

    private static final Object NULL = new Object();
    private static final class ExceptionHolder {
        private final RuntimeException e;
        private ExceptionHolder( RuntimeException e ) {
            this.e = e;
        }
    }
}