Kotlin: CoroutineでRxJavaのzipっぽいものを表現する

Coroutineで非同期処理を並列に処理したいとします。例外を考慮しないなら単純にasyncで包めば良いです。

launch {
  val task1 = async { MainService.task1() }
  val task2 = async { MainService.task2() }

  println("${task1.await()}\n${task2.await()}")
}

asyncで包むことで、並列に処理をすることができます。

次に、各非同期処理が例外を吐く場合を考えてみます。その場合は、呼び出し元でrunCatchingを使います。

launch {
  val task1 = async { runCatching { MainService.task1() } }
  val task2 = async { runCatching { MainService.task2() } }

  val result1 = task1.await()
  val result2 = task2.await()
  println("$result1\n" + "$result2\n")
}

runCatchingを使うことで、呼び出し先で例外が起こったとしても、処理を継続することが出来ます。


また、次のように書くことは出来ません。

launch {
  val task1 = async { MainService.task1() }
  val task2 = async { MainService.task2() }

  // awaitのタイミングでrunCatchingを使う
  val result1 = runCatching { task1.await() }
  val result2 = runCatching { task2.await() }
  println("$result1\n" + "$result2\n")
}

デフォルトの設定だと例外は伝搬し、子が倒れたら親も倒れます。

しかし、supervisorScopeを使うと、上記の振る舞いを防ぐことができます。

launch {
  supervisorScope {
    val task1 = async { MainService.task1() }
    val task2 = async { MainService.task2() }

    // supervisorScopeを使っているのでこのタイミングでもおk
    val result1 = runCatching { task1.await() }
    val result2 = runCatching { task2.await() }
    println("$result1\n" + "$result2\n")
  }
}

supervisorScopeを使うと、Deferred.awaitのタイミングで例外のハンドリングをすることが出来ます。


最後にキャンセルについて考えてみます。

任意の子ジョブがキャンセルされても処理を続けたいとしたら、前述のsupervisorScopeを使う必要があります。

launch {
  supervisorScope {
    val task1 = async { MainService.task1() }
    val task2 = async { MainService.task2() }

    val result1 = runCatching { task1.await() }
    val result2 = runCatching { task2.await() }
    println("$result1\n" + "$result2\n")
  }
}

このように書くことで、例えばtask1がキャンセルされたとしても、他の部分の処理を継続することが出来ます。

まとめ

Written by
あんどろいどでぃべろっぱぁー🍎