RxJava: SubjectでonErrorを取り扱う時

結論

背景

onErrorをSubjectに流すと, Subjectはdispose状態になるため, これ以降のEventを流すことが出来なくなります.

sub.subscribe(
    v -> Log.d("onNext", String.valueOf(v)),
    e -> Log.d("onError", e.getMessage()));
sub.onNext(v1);
sub.onNext(v2);
sub.onError(e1);
sub.onNext(v3); // ignore
sub.onNext(v4); // ignore

これだと, Subjectがこれ以降に値を受け付けないため不便になることがあります.(ユースケース次第)

materialize + dematerializeオペレータを使うと, この問題を解決出来ます.

Observable.create((ObservableOnSubscribe<String>) e -> {
    e.onNext("1");
    e.onNext("2");
    e.onError(new RuntimeException("e1"));
}).materialize().subscribe(sub::onNext);
Observable.create((ObservableOnSubscribe<String>) e -> {
    e.onNext("3");
    e.onNext("4");
    e.onError(new RuntimeException("e2"));
}).materialize().subscribe(sub::onNext);

materialize を使い, ErrorイベントをNextイベントに変換します

そして, Subscribeするタイミングで, dematerializeでNextイベントをErrorイベントに戻します.

sub.dematerialize().subscribe(
    v -> Log.d("onNext", String.valueOf(v)),
    e -> Log.d("onError", e.getMessage()));

こうすることで, Subjectをdispose状態にせずに, ObserverでonErrorイベントを処理することが出来ます.

参照

Written by