I want to handle case when i can read/write to table in more than 1 thread.
Case is:
- Table size is 500 rows
- Thread read 50 rows, lock it, process data and then saving this 50 rows at 1 batch
- Second thread read NEXT 50 rows and do the same
…
- It goes until all 500 rows will be processed and saved.
But it seems like both threads ingoring locks (or locks opened after processing batch of 50 rows?) and process whole table per thread.
Look at the example:
I have table Cards with signature:
create table cards(
id serial ,
number int,
change_date timestamp,
check_field text
);
Then i filled it with 500 rows:
ThreadPoolTaskExecutor configuration:
@Configuration
public class ThreadPoolConfig {
@Bean
public ThreadPoolTaskExecutor setExecutorTasked(){
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
threadPoolTaskExecutor.setCorePoolSize(2);
threadPoolTaskExecutor.setMaxPoolSize(4);
threadPoolTaskExecutor.setThreadNamePrefix("THREAD-");
threadPoolTaskExecutor.initialize();
return threadPoolTaskExecutor;
}
}
Repository configuration:
@Repository
public interface CardsRepository extends JpaRepository<Cards, Long> {
@Override
<S extends Cards> List<S> saveAll(Iterable<S> entities);
@Query(value = "select c from Cards c")
@Lock(LockModeType.PESSIMISTIC_WRITE)
@QueryHints({
@QueryHint(name = "jakarta.persistence.lock.timeout", value = LockOptions.SKIP_LOCKED + "")
})
Page<Cards> findAll(Pageable pageable);
}
Service that threads are using:
@Service
public class ProcessingService {
private final CardsRepository cardsRepository;
public ProcessingService(CardsRepository cardsRepository) {
this.cardsRepository = cardsRepository;
}
@Transactional
public void getAndProcessData() {
int pageSize = 50;
int page = 0;
int counter = 1;
Page<Cards> resultPage;
do {
PageRequest pageRequest = PageRequest.of(page, pageSize);
resultPage = cardsRepository.findAll(pageRequest);
for (Cards card: resultPage){
card.setCheckField("Run" + " Thread: " + Thread.currentThread().getName());
}
cardsRepository.saveAll(resultPage);
System.out.println("entities saved, iteration: " + counter + " ; Thread: " + Thread.currentThread().getName() );
counter++;
page++;
}while (resultPage.hasNext());
}
}
here i take 50 rows, make changes at entities and then save it. And do this until all 500 rows whould be processed.
@Service
@RequiredArgsConstructor
public class ProcessingServiceDecorator {
@Autowired
private final ThreadPoolTaskExecutor threadPoolTaskExecutor;
private final ProcessingService processingService;
public void process() throws InterruptedException {
System.out.println("START OF HANDLE");
for(int i = 0; i < 2; i++) {
threadPoolTaskExecutor.execute(() -> processingService.getAndProcessData());
}
//threadPoolTaskExecutor.shutdown();
System.out.println("END OF HANDLE");
}
}
And final decorator to make data handling multithreaded.
But after run, logs look like:
entities saved, iteration: 1 ; Thread: THREAD-1
entities saved, iteration: 1 ; Thread: THREAD-2
entities saved, iteration: 2 ; Thread: THREAD-2
entities saved, iteration: 3 ; Thread: THREAD-2
entities saved, iteration: 4 ; Thread: THREAD-2
entities saved, iteration: 5 ; Thread: THREAD-2
entities saved, iteration: 6 ; Thread: THREAD-2
entities saved, iteration: 2 ; Thread: THREAD-1
entities saved, iteration: 7 ; Thread: THREAD-2
entities saved, iteration: 8 ; Thread: THREAD-2
entities saved, iteration: 9 ; Thread: THREAD-2
entities saved, iteration: 10 ; Thread: THREAD-2
entities saved, iteration: 3 ; Thread: THREAD-1
entities saved, iteration: 4 ; Thread: THREAD-1
entities saved, iteration: 5 ; Thread: THREAD-1
entities saved, iteration: 6 ; Thread: THREAD-1
entities saved, iteration: 7 ; Thread: THREAD-1
entities saved, iteration: 8 ; Thread: THREAD-1
entities saved, iteration: 9 ; Thread: THREAD-1
entities saved, iteration: 10 ; Thread: THREAD-1
so, each thread handle whole table, all 500 rows.
Where am i doing mistake?