next()でのタイムアウトをサポートするイテレータ
next()でのタイムアウトをサポートするイテレータを作ってみました。
- 通常のjava.util.Iteratorのラッパーとして使え、
- 追加のAPIである「next (long timeout, TimeUnit unit)」を使うことで、タイムアウト指定ありの次要素取得が可能です。
専用の先読みスレッドを使ってイテレータを読みつつ、取得できた分だけ順に返す/一定時間しても取得できなければタイムアウト、という仕組み。
利用例
// リスト List<String> test = Arrays.asList( new String[] { "aaa", "bbb", "ccc", "ddd" } ); // 作成。引数で移譲先のイテレータを渡す。 TimeoutSupportIterator<String> it = new TimeoutSupportIterator<String>( test.iterator() ); while ( it.hasNext() ) { try { // タイムアウト有りで次の要素を取得 String item = it.next( 1000, TimeUnit.MILLISECONDS ); } catch ( TimeoutException e ) { // 1000ミリ秒待っても次の要素が取得できない場合、ここにくる。 } }
実装
import java.util.Iterator; import java.util.NoSuchElementException; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; /** * データ取得のタイムアウトをサポートするイテレータ。 * {@link Iterator#next()}で一定時間待っても要素が取得できなければ、タイムアウトします。 * * @param <T> 要素の型 * * @version $Revision:$ * @author $Author:$ */ public class TimeoutSupportIterator<T> implements Iterator<T> { /**読み込んだ要素を格納するキュー*/ private final BlockingQueue<Object> queue; /**読み込み処理*/ private final Prefetch<T> prefetch; /**引換券*/ private final Future<Void> future; /** * コンストラクタ * @param iterator イテレータ */ public TimeoutSupportIterator( Iterator<T> iterator ) { this.queue = new LinkedBlockingQueue<Object>(); this.prefetch = new Prefetch<T>( iterator, this.queue ); ExecutorService service = Executors.newSingleThreadExecutor(); future = service.submit( this.prefetch ); } /** * 次の要素を取得します。 * 一定時間経過後も要素が取得できない場合、タイムアウトします。 * * @param timeout * @param unit * @return 次の要素 * @throws TimeoutException 一定時間経過後も要素が取得できない場合 * @throws InterruptedException スレッドで割り込みが発生した場合 */ public T next (long timeout, TimeUnit unit) throws TimeoutException, InterruptedException { if ( !hasNext() ) { throw new NoSuchElementException(); } Object item = queue.poll( timeout, unit ); if ( item == null ) { throw new TimeoutException(); } else { return handleItem( item ); } } /** * @see java.util.Iterator#next() */ public T next () { Object item = null; while ( item == null ) { try { if ( !hasNext() ) { throw new NoSuchElementException(); } item = queue.take(); } catch ( InterruptedException e ) { // 割り込みは無視。 } } return handleItem( item ); } /** * @see java.util.Iterator#hasNext() */ public boolean hasNext () { return prefetch.hasNext(); } /** * @see java.util.Iterator#remove() */ public void remove () { throw new UnsupportedOperationException( "not implements." ); } /** * イテレータの読み込みを停止します。 * これを呼び出した後は、{@link #next()}または{@link #next(long, TimeUnit)}を実行してはいけません。 * @return 読み込みが完了する前に取り消された場合は true */ public boolean close () { return future.cancel( true ); } /** * キューの要素を処理する。 * @param item キューの要素 * @return 結果 */ private final T handleItem( Object item ) { if ( item == NULL ) { return null; } else if ( item instanceof ExceptionHolder ) { throw ((ExceptionHolder) item).e; } else { return (T) item; } } /** * 先読み処理 * * @version $Revision:$ * @author $Author:$ */ private static final class Prefetch<T> implements Callable<Void> { private final Iterator<T> iterator; private final BlockingQueue<Object> queue; private final Object monitor = new Object(); private Prefetch( Iterator<T> iterator, BlockingQueue<Object> queue ) { this.iterator = iterator; this.queue = queue; } private boolean hasNext() { // キューに何かあるか、イテレータに未読み込みの要素があれば次がある。 synchronized( monitor ) { return !queue.isEmpty() || iterator.hasNext(); } } public Void call () { while ( true ) { // キューをさわる間、hasNext()と処理を同期化 synchronized( monitor ) { if ( iterator.hasNext() ) { Object item = null; try { item = iterator.next(); } catch ( RuntimeException e ) { // 例外は専用のオブジェクトに積んで渡す item = new ExceptionHolder(e); } if ( item == null ) { item = NULL; // nullはキューに詰めないので差し替えておく。 } try { queue.put( item ); } catch ( InterruptedException e ) { break; } } else { break; } } if ( Thread.interrupted() ) { break; } } return null; } } private static final Object NULL = new Object(); private static final class ExceptionHolder { private final RuntimeException e; private ExceptionHolder( RuntimeException e ) { this.e = e; } } }