I need some advice on how chunks based approach can be used on my ITEM READER.
i have 200k and above records which i consume from an api.
in this code if i put page size as 100 and chunk size as 100, suppose think i have only 233 records not 200k. then my reader first its returning 100, 100, 33 first all reader is done , then its going to processor now processor is run thrice for 100, 100, 33.
so i want to know is there a way if i say chunk size 100 i dont want to wait until all that 200k in reader is done then going to processor can i send 100 to processor and writer then comeback to reader again process 100 records pass them to processor, writer.
so if there are million of records of rest api, which approach is best for reader.
@RequiredArgsConstructor
public class AssignTermsToEntityItemReader implements ItemReader<AssignTermEntityRequest> {
private final NativeApiService nativeApiService;
@Value("${hmdmg.default.page-size}")
private Integer pageSize;
@Value("${batch.entity-terms-assign-job.batchSize}")
private Integer chunkSize;
private int currentOffset = 0;
private List<NativeEntity> accumulatedBatch = new ArrayList<>();
@Override
public AssignTermEntityRequest read() {
while (accumulatedBatch.size() < chunkSize) {
List<NativeEntity> currentBatch = fetchNextBatch();
if (currentBatch.isEmpty()) {
break;
}
accumulatedBatch.addAll(currentBatch);
}
if (accumulatedBatch.isEmpty()) {
return null;
}
List<NativeEntity> itemsToProcess = new ArrayList<>(accumulatedBatch.subList(0, Math.min(chunkSize, accumulatedBatch.size())));
accumulatedBatch.subList(0, itemsToProcess.size()).clear();
return AssignTermEntityRequest.builder().nativeEntities(itemsToProcess).build();
}
private List<NativeEntity> fetchNextBatch() {
NativeSearchResult nativeSearchResult = searchHiveColumns(currentOffset);
currentOffset += pageSize;
return nativeSearchResult.getEntities();
}
private NativeSearchResult searchHiveColumns(int offset) {
NativeSearchRequest searchRequestToRetrieveHiveColumns = createSearchRequestToRetrieveHiveColumns(offset);
ResponseEntity<NativeSearchResult> nativeSearchResultResponseEntity = nativeApiService.searchHiveColumns(searchRequestToRetrieveHiveColumns);
return Optional.ofNullable(nativeSearchResultResponseEntity)
.map(ResponseEntity::getBody)
.orElse(new NativeSearchResult());
}
private NativeSearchRequest createSearchRequestToRetrieveHiveColumns(int offset) {
return NativeSearchRequest.builder()
.typeName(TYPE_NAME)
.excludeDeletedEntities(true)
.sortOrder("ASCENDING")
.sortBy("name")
.entityFilters(NativeEntityFilter.builder()
.condition(NativeSearchCondition.AND)
.criterion(Arrays.asList(
NativeSearchCriteria.builder()
.attributeName("name")
.operator("eq")
.attributeValue("account_number")
.build())).build())
.offset(String.valueOf(offset))
.limit(String.valueOf(pageSize))
.build();
}
}
i tried the 3 things of chunk tests
one pagesize less than chunk size, pagesize greater than chunk size, page size equal to chunk size.