I implemented a custom vault configProvider with dynamic credentials rotation using configData(Map<String, String> data, Long ttl) for a MongoSink Kafka connect.
The first version of plugin which I created it works fine:
public ConfigData get(String path, Set<String> keys) {
Map<String, String> properties= vault.getDynamicDatabseSecret(vault, path);
Long ttl= vault.getLeasDuration(vault, path);
for (String key : keys) {
String value = properties.get(key);
if (value != null) {
data.put(key, value);
}
}
return new ConfigData(data, Integer.toUnsignedLong(Long.valueOf(ttl).intValue() * 1000));
}
This version works and DistributedHerder tick method is well executed and tasks are restaretd. The only issue that, there was many calls to get method in the custom ConfigProvider and each time it generates a new vault credentials (depends on alis and tasks number)
I tried to put the credentials in a cache if they are still valid (based on ttl). I implemented a second version :
private ConcurrentHashMap<String, CredsMetadata> credsProperties = new ConcurrentHashMap<>();
public ConfigData get(String path, Set<String> keys) {
Map<String, String> properties = null;
Long ttl =null;
if (credsProperties.get(path) == null || (credsProperties.get(path) != null
&& isNeedRenew(credsProperties.get(path).getExpirationTime(), this.minTTLCreds)) ) { // if we need renew vault credentials
properties= vault.getDynamicDatabseSecret(vault, path);
ttl = VaultUtils.getLeasDuration(vault, path);
credsProperties.put(path, new CredsMetadata(dynamicDatabseSecret, LocalDateTime.now().plusSeconds(computedTTL)));
}else{//if no need to renew we get credentials frm cache
properties = credsProperties.get(path).getProperties();
ttl = Duration.between(LocalDateTime.now(), credsProperties.get(path).getExpirationTime()).getSeconds();
}
for (String key : keys) {
String value = properties.get(key);
if (value != null) {
data.put(key, value);
}
}
return new ConfigData(data, Integer.toUnsignedLong(Long.valueOf(ttl).intValue() * 1000));
}
when I checked the log, I see this in distributedHerder class log:
Skipping config updates with incremental cooperative rebalancing since no config rebalance is required and there are no connector config, task config, or target state changes pending
I Added some logs, and I see that in distributedHerder class is comparing new vault credentials with new vault credentials which are equals in AbstarctHerder.taskConfigsChanged().
can you told me if I m missing some configuration or if I need to change configData values in every call (return always new vault credentials in get method)?