I have Spring Boot v2.7.3 with Caffeine cache v2.9.3
I just noticed that while load()
(triggered from get()
) function runs on async thread, loadAll()
(triggered from getAll()
) works only main thread. Here are my configurations:
-
Thread Configuration
@EnableAsync @Configuration @RequiredArgsConstructor public class ThreadConfiguration { private final Config config... @Bean(name = "caffeineAsyncCacheExecutor") public TaskExecutor caffeineAsyncCacheExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(10); executor.setMaxPoolSize(20); executor.setThreadNamePrefix("CaffeineAsyncCacheExecutor-"); if (config.getAwaitTimeBeforeShutdown() > 0) { executor.setWaitForTasksToCompleteOnShutdown(true); executor.setAwaitTerminationMillis(config.getAwaitTimeBeforeShutdown()); } executor.initialize(); return executor; } }
-
Service and cache configuration:
@Service public class MyService { private LoadingCache<BigDecimal, Object> serviceCache; private final TaskExecutor caffeineAsyncCacheExecutor; private final Long refreshAfterWriteInMillis; private final AsyncCacheLoader asyncCacheLoader; @Autowired public MyService(TaskExecutor caffeineAsyncCacheExecutor, AsyncCacheLoader asyncCacheLoader, @Value("${...}") Long refreshAfterWriteInMillis) { this.caffeineAsyncCacheExecutor = caffeineAsyncCacheExecutor; this.asyncCacheLoader = asyncCacheLoader; this.refreshAfterWriteInMillis = refreshAfterWriteInMillis; } @PostConstruct public void init() { serviceCache = Caffeine.newBuilder() .refreshAfterWrite(Duration.ofMillis(refreshAfterWriteInMillis)) .executor(caffeineAsyncCacheExecutor) .build(asyncCacheLoader); initializeCache(); } private void initializeCache() { try { List<Long> preEvents = ...; if (preEvents.isEmpty()) { log.info("There are no active events for cache ..."); return; } List<BigDecimal> activeEventIds = preEvents-map-to-bigdecimal; serviceCache.getAll(preEvents); } catch (Exception ex) { log.error("Exception while initializing cache", ex); } } public Object getEvent(BigDecimal eventId) { // this get() call runs on CaffeineAsyncCacheExecutor- return serviceCache.get(eventId); } public List<Object> getEvents() { List<Long> preEvents = ...; List<BigDecimal> activeEventIds = preEvents-map-to-bigdecimal; // this getAll() call runs on main-thread Map<BigDecimal, Object> allEvents = serviceCache.getAll(activeEventIds); return new ArrayList<>(allEvents.values()); } }
-
Finally here is my
AsyncCacheLoader
:@Component public class AsyncCacheLoader implements CacheLoader<BigDecimal, Object> { private final MyFeignClient feignClient; @Override public @Nullable Object load(@NonNull BigDecimal event) { // Thread.currentThread().getName() => prints CaffeineAsyncCacheExecutor-1 Object notFound = ... try { Object response = feignClient.getEvent(eventId); return Objects.requireNonNullElse(response, notFound); } catch (Exception ex) { log.error("load ", ex); return notFound; } } @Override public @NonNull Map<@NonNull BigDecimal, @NonNull Object> loadAll(@NonNull Iterable<? extends @NonNull BigDecimal> eventsFromCaffeine) throws Exception { // Thread.currentThread().getName() => prints main List<BigDecimal> events = new ArrayList<>(); eventIdsFromCache.forEach(eventsFromCaffeine::add); List<List<BigDecimal>> byPartitions = Lists.partition(eventIds, PARTITION_SIZE); Map<BigDecimal, Object> bulkResponse = new HashMap<>(); for (List<BigDecimal> partition: byPartitions) { int page = 0; SimplePageResponse<Object> response; do { try { response = feingClient.getEvents(partition, page, PARTITION_SIZE); response.getContent().forEach(item -> { bulkResponse.put(item.getEventId(), item); }); page = response.getPage() + 1; } catch (Exception ex) { log.error("loadAll", ex); response = SimplePageResponse.<Object>builder() .content(List.of()) .totalPages(Integer.MIN_VALUE) .totalElements(0L) .page(0) .size(0) .build(); } } while (page < response.getTotalPages()); } return bulkResponse; } }