I am spring boot version v2.7.3 with caffeine cache v2.9.3
I just noticed that while load() (triggered from get()) function runs on async thread loadAll() (triggered from getAll()) works only main thread. Here are the my configurations:
- Thread Configuration
@EnableAsync
@Configuration
@RequiredArgsConstructor
public class ThreadConfiguration {
private final Config config...
@Bean(name = "caffeineAsyncCacheExecutor")
public TaskExecutor caffeineAsyncCacheExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(20);
executor.setThreadNamePrefix("CaffeineAsyncCacheExecutor-");
if (config.getAwaitTimeBeforeShutdown() > 0) {
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setAwaitTerminationMillis(config.getAwaitTimeBeforeShutdown());
}
executor.initialize();
return executor;
}
}
- Service and cache configuration:
@Service
public class MyService {
private LoadingCache<BigDecimal, Object> serviceCache;
private final TaskExecutor caffeineAsyncCacheExecutor;
private final Long refreshAfterWriteInMillis;
private final AsyncCacheLoader asyncCacheLoader;
@Autowired
public MyService(TaskExecutor caffeineAsyncCacheExecutor,
AsyncCacheLoader asyncCacheLoader,
@Value("${...}") Long refreshAfterWriteInMillis) {
this.caffeineAsyncCacheExecutor = caffeineAsyncCacheExecutor;
this.asyncCacheLoader = asyncCacheLoader;
this.refreshAfterWriteInMillis = refreshAfterWriteInMillis;
}
@PostConstruct
public void init() {
serviceCache = Caffeine.newBuilder()
.refreshAfterWrite(Duration.ofMillis(refreshAfterWriteInMillis))
.executor(caffeineAsyncCacheExecutor)
.build(asyncCacheLoader);
initializeCache();
}
private void initializeCache() {
try {
List<Long> preEvents = ...;
if (preEvents.isEmpty()) {
log.info("There are no active events for cache ...");
return;
}
List<BigDecimal> activeEventIds = preEvents-map-to-bigdecimal;
serviceCache.getAll(preEvents);
} catch (Exception ex) {
log.error("Exception while initializing cache", ex);
}
}
public Object getEvent(BigDecimal eventId) {
// this get() call runs on CaffeineAsyncCacheExecutor-
return serviceCache.get(eventId);
}
public List<Object> getEvents() {
List<Long> preEvents = ...;
List<BigDecimal> activeEventIds = preEvents-map-to-bigdecimal;
// this getAll() call runs on main-thread
Map<BigDecimal, Object> allEvents = serviceCache.getAll(activeEventIds);
return new ArrayList<>(allEvents.values());
}
}
- Finally here is the my asynccacheloader:
@Component
public class AsyncCacheLoader implements CacheLoader<BigDecimal, Object> {
private final MyFeignClient feignClient;
@Override
public @Nullable Object load(@NonNull BigDecimal event) {
// Thread.currentThread().getName() => prints CaffeineAsyncCacheExecutor-1
Object notFound = ...
try {
Object response = feignClient.getEvent(eventId);
return Objects.requireNonNullElse(response, notFound);
} catch (Exception ex) {
log.error("load ", ex);
return notFound;
}
}
@Override
public @NonNull Map<@NonNull BigDecimal, @NonNull Object> loadAll(@NonNull Iterable<? extends @NonNull BigDecimal> eventsFromCaffeine) throws Exception {
// Thread.currentThread().getName() => prints main
List<BigDecimal> events = new ArrayList<>();
eventIdsFromCache.forEach(eventsFromCaffeine::add);
List<List<BigDecimal>> byPartitions = Lists.partition(eventIds, PARTITION_SIZE);
Map<BigDecimal, Object> bulkResponse = new HashMap<>();
for (List<BigDecimal> partition: byPartitions) {
int page = 0;
SimplePageResponse<Object> response;
do {
try {
response = feingClient.getEvents(partition, page, PARTITION_SIZE);
response.getContent().forEach(item -> {
bulkResponse.put(item.getEventId(), item);
});
page = response.getPage() + 1;
} catch (Exception ex) {
log.error("loadAll", ex);
response = SimplePageResponse.<Object>builder()
.content(List.of())
.totalPages(Integer.MIN_VALUE)
.totalElements(0L)
.page(0)
.size(0)
.build();
}
} while (page < response.getTotalPages());
}
return bulkResponse;
}
}