Noticing weird behavior with ConcurrentHashMap iteration.
Scenario:
- Single publisher thread constantly adds new keys to the concurrent hashmap. Note keys are added in sequential order.
- Multiple subscriber thread tries to read all present values in the concurrent hashmap.
Many subscribers seem to be missing keys which were added much earlier. Example, when a subscriber reads the map with size 100. It is expected that the subscriber will see atleast keys from 0 to 99. However, it seems it does not return all the keys. Below is the sample code that reproduces the issue. In other, words even after example key 100 is added in the map, on random reads, key 100 appear to be missing.
In terms of explanation, I am thinking that when a key is being resolved by chaining and a read all requests comes in, then the collision related keys are not returned. However, I am looking if anyone else has seen this and can provide a better explanation for the below behavior.
Below code has 3 variants:
- one where the problem shows up (
test()
) - one where if the map is kept big enough and collisions ~”do not occur”, then complete view is received on random basis (
testWithNoCollisions
). - one where locks are used to ensure when a subscriber is reading the whole map, then the writers are blocked (
testWithLocks
).
import java.util.HashSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class ConcurrentHashMapWeakConsistency {
public static void main(String[] args) throws InterruptedException {
test();
// testWithNoCollisions();// Sometimes it passes when there is no collision
// testWithLocks();// This one works but it has external synchronize mechanism.
}
public static void test() throws InterruptedException {
var map = new ConcurrentHashMap<String, Integer>();
var publisher = Executors.newSingleThreadExecutor();
var totalMessages = 1_000_000;
publisher.submit(() -> {
for (int i = 0; i < totalMessages; i++) {
var key = "key-" + i;
map.put(key, i);
if (i % 10000 == 0) {
System.out.printf("Published %d messages.n", i);
}
}
System.out.printf("Published all %d messagesn", totalMessages);
});
var subscriberCount = 100;
var subscribers = Executors.newFixedThreadPool(10);
var subsLatch = new CountDownLatch(subscriberCount);
IntStream.range(0, 100).parallel().forEach(subscriberId -> subscribers.submit(() -> {
try {
var existingKeys = new HashSet<>(map.keySet());
var size = existingKeys.size();
//Note the keys are inserted by publisher in sequential order.
// Hence, existing keys values should have all keys from range 0 to size-1
// This is where weak consistency shows up.
var missingKeys = IntStream.range(0, size).filter(num -> !existingKeys.contains("key-" + num))
.sorted()
.boxed()
.toList();
if (!missingKeys.isEmpty()) {
var sortedExistingKeys = existingKeys.stream()
.sorted((k1, k2) -> Integer.parseInt(k1.split("-")[1]) - Integer.parseInt(k2.split("-")[1]))
.toList();
var start = sortedExistingKeys.get(0);
var end = sortedExistingKeys.get(sortedExistingKeys.size() - 1);
var missingStart = "key-" + missingKeys.get(0);
var missingEnd = "key-" + missingKeys.get(missingKeys.size() - 1);
throw new RuntimeException(String.format("Subscriber %d missing %d keys! Result key range is [ %s, %s]. However, many numbers in range [ %s, %s ] are missing", subscriberId, missingKeys.size(), start, end, missingStart, missingEnd));
}
} catch (Exception ex) {
ex.printStackTrace();
System.exit(-1);
} finally {
subsLatch.countDown();
System.out.printf("Subscriber %d finished reading from map.n", subscriberId);
}
}));
subsLatch.await();
publisher.shutdownNow();
subscribers.shutdownNow();
}
public static void testWithNoCollisions() throws InterruptedException {
var map = new ConcurrentHashMap<String, Integer>(1_000_000, .25f);
var publisher = Executors.newSingleThreadExecutor();
var totalMessages = 10_000;
publisher.submit(() -> {
for (int i = 0; i < totalMessages; i++) {
var key = "key-" + i;
map.put(key, i);
if (i % 10000 == 0) {
System.out.printf("Published %d messages.n", i);
}
}
System.out.printf("Published all %d messagesn", totalMessages);
});
var subscriberCount = 100;
var subscribers = Executors.newFixedThreadPool(10);
var subsLatch = new CountDownLatch(subscriberCount);
IntStream.range(0, 100).parallel().forEach(subscriberId -> subscribers.submit(() -> {
try {
var existingKeys = new HashSet<>(map.keySet());
var size = existingKeys.size();
//Note the keys are inserted by publisher in sequential order.
// Hence, existing keys values should have all keys from range 0 to size-1
// This is where weak consistency shows up.
var missingKeys = IntStream.range(0, size).filter(num -> !existingKeys.contains("key-" + num))
.sorted()
.boxed()
.toList();
if (!missingKeys.isEmpty()) {
var sortedExistingKeys = existingKeys.stream()
.sorted((k1, k2) -> Integer.parseInt(k1.split("-")[1]) - Integer.parseInt(k2.split("-")[1]))
.toList();
var start = sortedExistingKeys.get(0);
var end = sortedExistingKeys.get(sortedExistingKeys.size() - 1);
var missingStart = missingKeys.get(0);
var missingEnd = missingKeys.get(missingKeys.size() - 1);
throw new RuntimeException(String.format("Subscriber %d missing %d keys! Result key range is [ %s, %s]. However, many numbers in range [ %s, %s ] are missing", subscriberId, missingKeys.size(), start, end, missingStart, missingEnd));
}
} catch (Exception ex) {
ex.printStackTrace();
System.exit(-1);
} finally {
subsLatch.countDown();
System.out.printf("Subscriber %d finished reading from map.n", subscriberId);
}
}));
subsLatch.await();
publisher.shutdownNow();
subscribers.shutdownNow();
}
public static void testWithLocks() throws InterruptedException {
var map = new ConcurrentHashMap<String, Integer>();
var publisher = Executors.newSingleThreadExecutor();
var totalMessages = 1_000_000;
var subscriberActive = new AtomicBoolean(false);
publisher.submit(() -> {
for (int i = 0; i < totalMessages; i++) {
var key = "key-" + i;
// get the subscriber lock
while (!subscriberActive.compareAndSet(false, true)) ;
map.put(key, i);
subscriberActive.compareAndSet(true, false);
if (i % 10000 == 0) {
System.out.printf("Published %d messages.n", i);
}
}
System.out.printf("Published all %d messagesn", totalMessages);
});
var subscriberCount = 100;
var subscribers = Executors.newFixedThreadPool(10);
var subsLatch = new CountDownLatch(subscriberCount);
IntStream.range(0, 100).parallel().forEach(subscriberId -> subscribers.submit(() -> {
try {
while (!subscriberActive.compareAndSet(false, true)) ;
var existingKeys = new HashSet<>(map.keySet());
subscriberActive.compareAndSet(true, false);
var size = existingKeys.size();
//Note the keys are inserted by publisher in sequential order.
// Hence, existing keys values should have all keys from range 0 to size-1
// This is where weak consistency shows up.
var missingKeys = IntStream.range(0, size).filter(num -> !existingKeys.contains("key-" + num))
.sorted()
.mapToObj(i -> Integer.valueOf(i))
.collect(Collectors.toList());
if (!missingKeys.isEmpty()) {
var sortedExistingKeys = existingKeys.stream()
.sorted((k1, k2) -> Integer.parseInt(k1.split("-")[1]) - Integer.parseInt(k2.split("-")[1]))
.toList();
var start = sortedExistingKeys.get(0);
var end = sortedExistingKeys.get(sortedExistingKeys.size() - 1);
var missingStart = missingKeys.get(0);
var missingEnd = missingKeys.get(missingKeys.size() - 1);
throw new RuntimeException(String.format("Subscriber %d missing %d keys! Result key range is [ %s, %s]. However, many numbers in range [ %s, %s ] are missing", subscriberId, missingKeys.size(), start, end, missingStart, missingEnd));
}
} catch (Exception ex) {
ex.printStackTrace();
System.exit(-1);
} finally {
subsLatch.countDown();
System.out.printf("Subscriber %d finished reading from map.n", subscriberId);
}
}));
subsLatch.await();
publisher.shutdownNow();
subscribers.shutdownNow();
}
}
It appears that in ConcurrenHashMap, when concurrent writes and full reads of the map are done, then the read returns results which are often missing keys from the past. I was expecting that it would provide a snapshot of the map at a point of time. However that does not seem to be the result based on the above test code.
telu is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.