Kotlin: FlowのBufferの指定について

CoroutineのFlowにはBufferを指定することができます。Bufferの指定によってどのように動作が変わるかを試してみました。

この記事は、少しFlowを触ったことあるぞ!って言う人向けになります。また、検証はCoroutine 1.3.0-RCで行っております。

Bufferの種類

Bufferの種類は5つあります。

検証コード

// flowストリームの作成
val f = flow {
  repeat(100) {
    delay(1)
    emit("$it")
  }
}

// observeする
lifecycleScope.launch {
  f
    .buffer(...) ここでBufferを指定する
    .collect {
      delay(100)
      Log.d("t4", "$it")
    }
}

このコードは、まず最初に、flowメソッドを使って、Flowストリームを作ります。これをこの記事ではProducer(生産者)と呼びます。このProducerは1つの値を送るのに約delay(1)かかります。なので、処理に約1msかかります。

次に、作ったflowをobserveします。observeする側をConsumer(消費者)と呼びます。Consumerではdelay(100)しているので、処理に約100msかかります。

これは、次の関係を作り出しています

この状態で、Bufferを指定したらどのように挙動が変わるかを見ていきます。

UNLIMITED

f
  .buffer(Channel.UNLIMITED)
  .collect {
    delay(100)
    Log.d("t4", "$it")
  }

UNLIMITEDはBufferのサイズを無制限にします。

Consumer側で処理が終わってなくても値を送ります。その値は、Consumerが現在のタスクをしていたら、flowの内部で一旦保存されます。そして、Consumerの現在のタスクが完了したら、内部で保存したデータが1つずつ流れてきます。

RENDEZVOUS

f
  .buffer(Channel.RENDEZVOUS)
  .collect {
    delay(100)
    Log.d("t4", "$it")
  }

RENDEZVOUSは協調的に動かすためのBufferです。

Consumer側が処理出来るタイミングで、値を送ってきます。Consumserが現在のタスクをしていたら、一時停止します。

CONFLATED

f
  .buffer(Channel.CONFLATED) // or conflate()
  .collect {
    delay(100)
    Log.d("t4", "$it")
  }

CONFLATEDはBufferに値が溜まっていたら、その値を上書きするBufferです。

最新の値は取得できますが、古い値は削除されてしまいます。

BUFFERED

f
  .buffer(Channel.BUFFERED) // or 100などの具体的な値
  .collect {
    delay(100)
    Log.d("t4", "$it")
  }

BUFFEREDは最大16個までキャッシュすることが出来るBufferです。

バッファが満杯になったら、バッファの値が消費されるまで、Producerは一時停止します。

まとめ

Written by