In this series of articles , we are discussing about Kotlin flows and in this article we will discuss about Flow Cancellation.
There are 2 ways to cancel the flow
1.Cancel the coroutine job
// cancel the job
val job = launch {
sampleFlow()
.onCompletion { cause ->
if (cause is CancellationException) {
println("Flow cancelled Manually by job ")
}
}
.collect {
println("Type 1 Received $it")
}
}
delay(550)
job.cancel()
/**
* Output :
* Type 1 Received 1
* Type 1 Received 3
* Flow cancelled Manually by job
*/
2.Cancel in scope
// cancel the code in scope
launch {
sampleFlow()
.onCompletion { cause ->
if (cause is CancellationException) {
println("Flow cancelled Manually in scope")
}
}.collect {
println("Type 2 Received $it")
if (it == 3) {
cancel()
}
}
}.join()
/**
* Output :
* Type 2 Received 1
* Type 2 Received 3
* Flow cancelled Manually in scope
*/
Code gist for 2 types of cancelling can be found in https://gist.github.com/vprabhu/ebce4cf922f50325f3f82b400681c0ac
Cooperative Cancellation
All suspend function in coroutines are
cancellable
They check for coroutine cancellation internally and throw
CancellationException
when cancelledBut if a coroutine is not checking for cancellation when working on its job , then it is non-cancellable .
Behaviour of flowOf() when cancelling
flowOf()
doesn't check internally if the coroutine is active
launch { flowOf(1,3,5) .onCompletion { cause -> if (cause is CancellationException) { println("Flow cancelled $cause") } } .collect { println("Received $it") if (it == 3) { cancel() } } }.join() /** * output : * Received 1 * Received 3 * Received 5 */
In the above code , even when you are cancelling the flow if the value is 3 inside the
collect{}
block , its not stopping the flow.There are 2 ways to check that
1.ensureActive()
in onEach()
block
launch {
flowOf(1,3,5)
.onCompletion { cause ->
if (cause is CancellationException) {
println("Flow cancelled $cause")
}
}.onEach {
ensureActive()
}
.collect {
println("Received $it")
if (it == 3) {
cancel()
}
}
}.join()
/**
* output :
* Received 1
* Received 3
* Flow cancelled kotlinx.coroutines.JobCancellationException:
* StandaloneCoroutine was cancelled; job=StandaloneCoroutine{Cancelling}@635a732e
*/
2.cancellable()
launch {
flowOf(1,3,5)
.onCompletion { cause ->
if (cause is CancellationException) {
println("Flow cancelled $cause")
}
}
.cancellable()
.collect {
println("Received $it")
if (it == 3) {
cancel()
}
}
}.join()
/**
* output :
* Received 1
* Received 3
* Flow cancelled kotlinx.coroutines.JobCancellationException:
* StandaloneCoroutine was cancelled; job=StandaloneCoroutine{Cancelling}@635a732e
*/
Code gist for 2 types of cancelling in the flowOf()
can be found in https://gist.github.com/vprabhu/1a9defc5e8decffc619b5ed746b43d17
When the flow is working on something intensive , we can use
ensureActive()
inside the function to cancel it. Let me know if you need a sample code .
In the next article , we will discuss about Cold Flow and Hot Flow.
Please leave your comments to improve and discuss more
Happy and Enjoy coding