I have the following common code for “subscribing” to a Kotlin coroutine Flow that is used throughout my Android application…
fun <T> subscribe(
myFlow: Flow<Conclusion<T>>,
success: suspend (T) -> Unit,
error: suspend (Throwable?) -> Unit) =
collect(myFlow) { conclusion ->
conclusion.onSuccess { success -> success(success) }
.onError { error -> error(error) }
}
protected fun <T> collect(flow: Flow<T>, onEach: suspend (T) -> Unit ) {
viewModelScope.launch {
withContext(Dispatchers.IO) {
flow.filterNotNull()
.onEach { onEach(it) }
.catch { onError(it) }
.launchIn(this)
}
}
}
currently this common code code does not support cancellation,
my initial attempts to fix this are as follows:-
fun <T> subscribe(
myFlow: Flow<Conclusion<T>>,
success: suspend (T) -> Unit,
error: suspend (Throwable?) -> Unit) =
collect(myFlow.cancellable()) /* CHANGE HERE 1 */ { conclusion ->
conclusion.onSuccess { success -> success(success) }
.onError { error -> error(error) }
}
protected fun <T> collect(flow: Flow<T>, onEach: suspend (T) -> Unit ) {
viewModelScope.launch {
withContext(Dispatchers.IO) {
var theJobToCancel: Job? = currentCoroutineContext().job /* CHANGE HERE 2 */
flow.filterNotNull()
.onEach {
currentCoroutineContext().ensureActive() /* CHANGE HERE 3 */
onEach(it)
}
.catch { onError(it) }
.launchIn(this)
}
}
}
I now need to be able to communicate with the collect
function to trigger cancellation of the active collection process via onEach
.
theJobToCancel
is the job i need to cancel, however i cannot see how to achieve this
How can i cancel onEach
via theJobToCancel
for out side this common code block?