RxJava multicastについて

RxJavaには, stream上のアイテムを複数のSubscriberに渡せるmulticast(再利用のような感じ)の機能があります. multicastは主に「キャッシュとして使う」, イベントをが発火したときに複数で処理をおこなう」 として使います.

RxJavaでは2つの方法でmulticastを実現します.

また, RxJava2だとFlowableがありますが, Flowableの場合は Processorを使います.

説明だけだと分かりにくいので, サンプルコードも合わせてどうぞ.

ConnectableObservableについて

ConnectableObservableはpublishがコールされたタイミングで生成されます. ここがStreamの開始地点, 再利用する地点になります. なので, mapfilterなど, 共通の処理があるのなら, publishのUpstreamに書いて共通化したほうが効率が良いです.

ConnectableObservableはconnectメソッドがコールされたタイミングで, Streamの開始をします. 従来はSubscriberを登録した(subscribeメソッドをコールした)タイミングなので, 注意が必要です.

そして, publish + connectを組みわせることで1つのStreamに対して複数のSubscriberを登録することが出来ます.

Publish Connect

(source: https://github.com/ReactiveX/RxJava/wiki/Connectable-Observable-Operators)

他にも, sharerefcountがありますが, 基本的な考え方は変わりません.

Subjectを使う

Subjectは, ObservableとObserverの機能を同時に使うことができる実装です. 1例としては, Observerの機能を使い, 複数のObservableからデータを取得し, Observableの機能を使いそのデータをObserverに渡す, いわゆるBridgeのような事ができます.

BehaviorSubject, ReplaySubjectなどのバラエティがあるので, 必要に応じて使い分ける必要があります. (https://github.com/ReactiveX/RxJava/wiki/subject)

まとめ

multicastを上手に使いこなすことで, データの取得回数を減らし, アプリがパフォーマンスに優れた構造になることが期待できます. そのためには, どこにmulticastを入れるか, どこからConnectableObservableにするか(publishをコールするか)を決定する必要があります. アプリごとに適切なポイント, 書き方は異なると思うので, サンプルを参考に頂けたら幸いです.

Happy RxJava2 life!!

参考

Written by
あんどろいどでぃべろっぱぁー🍎