RxJava: Connectable Observableについて

結論

基本的な使い方

同じデータを必要としているObserverがあるとします. これを単純に書くと以下のようになります.

Obervable<User> getUsers() { return Observalbe.just(User1, User2, User3);}

Obervable<User> users = getUsers();
users.subscribe(o1);
users.subscribe(o2);
users.subscribe(o3);

これだと, subscribeされるたびに新しいStreamが作られてしまいます. 上の例だと3つの独立したObservableが作成されてしまい, 非常に効率が悪いです.

Connectable Observableを使うことで, 1つのStreamからsubscribeすることが出来ます.

ConnectableObservable<User> users = getUsers().publish();
users.subscribe(o1);
users.subscribe(o2);
users.subscribe(o3);
users.connect(); // 重要!!

publishオペレータを適用することで, Connectable Observableに変換出来ます. そして, Connectable Observableに対してObserverをsubscribeし, 最後にconnectをコールすることで, 共通のStreamからsubscribeすることが出来ます.

流れとしては以下のようになります.

  1. publishをコールすると, Connectable Observableが作成出来る
  2. Connectable ObservableをいくつかのObserverでsubscribeする
    • ここではStreamは開始されず, 登録のみされる
  3. 最後にconnectをコールする
    • Streamが開始される. 登録したObserverに対して共通の値がemitされていく

これが基本的なConnectable Observableの使い方になります.

まとめ

publish + connectは, Connectable Observableの中でも一番ポピュラーなものです. RxJavaには他にも, replaycacheといったConnectable ObservableのファミリーAPIがあります. これらは, ユースケースによってはConnectable Observableをより便利に使うことが出来るものです.

これらについては, また他の記事で紹介いたします.

最後まで読んでいただきありがとうございました!

参照

Written by