I have code which scans rows from a database and produces those rows in batches as a producer channel. I want to concurrently process the batches as new ones come in. I then want to watch for errors, and if any occur, cancel both the producer channel and the data processor. Here is my sample code:
fun main(args: Array<String>) {
println("Starting runBlocking.")
try {
runBlocking {
// produceData is my producer channel
val data = produceData()
var count = 0
data.consumeEach { batch ->
yield()
count++
if (count >= 10) {
println("Reached 10th batch, cancelling...")
// cancel here
cancel()
}
}
}
} catch (e: Throwable) {
println("Exception encountered.")
println(e.stackTraceToString())
}
println("After runBlocking.")
}
@OptIn(ExperimentalCoroutinesApi::class)
fun CoroutineScope.produceData(): ReceiveChannel<List<Data>> {
return produce {
db.connection.use { conn ->
conn.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY).use { stmt ->
stmt.fetchSize = Int.MIN_VALUE
val sql = // omitted working SQL
stmt.executeQuery(sql).use { rs ->
val batch = mutableListOf<Data>()
while (rs.next()) {
// Handle result set here
val data = Data(...) // omitted model mapper
batch.add(data)
if (batch.size >= 100) {
send(batch)
batch.clear()
}
}
}
}
}
}
}
The current behavior is that I get the following output:
Starting runBlocking.
Reached 10th batch, cancelling...
After that, it just hangs. I know the database code is working. I want to see “After runBlocking” to show that I’m able to exit the runBlocking correctly.
What am I doing wrong here? I’ve tried saving the coroutineContext from runBlocking and cancelling that, cancelling just it’s children, etc. I’ve tried adding yield() calls to both the producer and the consumer. How can I, from within the runBlocking { ... }
block, cancel the producer channel and consumer such that the runBlocking ends?