Implemented batch jobs using Spring Batch. However, I encountered OOM during the batch operation, and the heap memory graph looks like this.
Heap memory graph
There are also memory snapshots.
Memory snapshot
@Tasklet
class SynchronizeStudentsTasklet(
private val customStudentRepository: CustomStudentRepository,
private val studentClient: StudentClient,
private val lectureClient: LectureClient,
) : ItemTasklet<Mono<Student>> {
private val logger = getLogger()
private val years = (2018..Year.now().value).toList()
private var number = 100000
private var index = 0
override fun read(): Mono<Student>? =
years.getOrNull(index)
?.let { year ->
if (number >= 102000) {
index += 1
number = 100000
}
"$year${number++}"
}
?.let {
studentClient.getStudentByNumber(it)
.flatMap { student ->
Mono.zip(
lectureClient.getAppliedLectureIdsByStudentNumber(it)
.collectList(),
lectureClient.getPreAppliedLectureIdsByStudentNumber(it)
.collectList()
).map { (appliedLectureIds, preAppliedLectureIds) ->
student.copy(
appliedLectureIds = appliedLectureIds.toHashSet(),
preAppliedLectureIds = preAppliedLectureIds.toHashSet()
)
}
}
}
override fun write(chunk: Chunk<out Mono<Student>>) {
Flux.concat(chunk.items)
.flatMap { customStudentRepository.upsert(it) }
.doOnNext { logger.info { it } }
.collectList()
.block()
}
}
Batch operations use chunk-based batching, and read() makes 6 HTTP requests (concurrency enforced, WebClient).
Looking at this, can we be sure that the cause of OOM is a memory leak?
Using netty.leakDetection.level=advanced
resulted in the message LEAK: ByteBuf.release() was not called before garbage collection
. However, we did not use low-level ByteBuf, only WebClient.
Sangyoon Jeong is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.