Concurrent HashMap weak consistency missing past keys during iteration – multi thread pub/sub

Noticing weird behavior with ConcurrentHashMap iteration.
Scenario:

  • Single publisher thread constantly adds new keys to the concurrent hashmap. Note keys are added in sequential order.
  • Multiple subscriber thread tries to read all present values in the concurrent hashmap.

Many subscribers seem to be missing keys which were added much earlier. Example, when a subscriber reads the map with size 100. It is expected that the subscriber will see atleast keys from 0 to 99. However, it seems it does not return all the keys. Below is the sample code that reproduces the issue. In other, words even after example key 100 is added in the map, on random reads, key 100 appear to be missing.

In terms of explanation, I am thinking that when a key is being resolved by chaining and a read all requests comes in, then the collision related keys are not returned. However, I am looking if anyone else has seen this and can provide a better explanation for the below behavior.

Below code has 3 variants:

  1. one where the problem shows up (test())
  2. one where if the map is kept big enough and collisions ~”do not occur”, then complete view is received on random basis (testWithNoCollisions).
  3. one where locks are used to ensure when a subscriber is reading the whole map, then the writers are blocked (testWithLocks).
import java.util.HashSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

public class ConcurrentHashMapWeakConsistency {

    public static void main(String[] args) throws InterruptedException {
        test();
//        testWithNoCollisions();// Sometimes it passes when there is no collision
//        testWithLocks();// This one works but it has external synchronize mechanism.
    }

    public static void test() throws InterruptedException {
        var map = new ConcurrentHashMap<String, Integer>();

        var publisher = Executors.newSingleThreadExecutor();
        var totalMessages = 1_000_000;
        publisher.submit(() -> {
            for (int i = 0; i < totalMessages; i++) {
                var key = "key-" + i;
                map.put(key, i);
                if (i % 10000 == 0) {
                    System.out.printf("Published %d messages.n", i);
                }
            }
            System.out.printf("Published all %d messagesn", totalMessages);
        });

        var subscriberCount = 100;
        var subscribers = Executors.newFixedThreadPool(10);
        var subsLatch = new CountDownLatch(subscriberCount);

        IntStream.range(0, 100).parallel().forEach(subscriberId -> subscribers.submit(() -> {
            try {
                var existingKeys = new HashSet<>(map.keySet());
                var size = existingKeys.size();

                //Note the keys are inserted by publisher in sequential order.
                // Hence, existing keys values should have all keys from range 0 to size-1
                // This is where weak consistency shows up.
                var missingKeys = IntStream.range(0, size).filter(num -> !existingKeys.contains("key-" + num))
                        .sorted()
                        .boxed()
                        .toList();

                if (!missingKeys.isEmpty()) {
                    var sortedExistingKeys = existingKeys.stream()
                            .sorted((k1, k2) -> Integer.parseInt(k1.split("-")[1]) - Integer.parseInt(k2.split("-")[1]))
                            .toList();
                    var start = sortedExistingKeys.get(0);
                    var end = sortedExistingKeys.get(sortedExistingKeys.size() - 1);
                    var missingStart = "key-" + missingKeys.get(0);
                    var missingEnd = "key-" + missingKeys.get(missingKeys.size() - 1);
                    throw new RuntimeException(String.format("Subscriber %d missing %d keys! Result key range is [ %s, %s]. However, many numbers in range [ %s, %s ] are missing", subscriberId, missingKeys.size(), start, end, missingStart, missingEnd));
                }
            } catch (Exception ex) {
                ex.printStackTrace();
                System.exit(-1);
            } finally {
                subsLatch.countDown();
                System.out.printf("Subscriber %d finished reading from map.n", subscriberId);
            }
        }));


        subsLatch.await();
        publisher.shutdownNow();
        subscribers.shutdownNow();
    }

    public static void testWithNoCollisions() throws InterruptedException {
        var map = new ConcurrentHashMap<String, Integer>(1_000_000, .25f);

        var publisher = Executors.newSingleThreadExecutor();
        var totalMessages = 10_000;
        publisher.submit(() -> {
            for (int i = 0; i < totalMessages; i++) {
                var key = "key-" + i;
                map.put(key, i);
                if (i % 10000 == 0) {
                    System.out.printf("Published %d messages.n", i);
                }
            }
            System.out.printf("Published all %d messagesn", totalMessages);
        });

        var subscriberCount = 100;
        var subscribers = Executors.newFixedThreadPool(10);
        var subsLatch = new CountDownLatch(subscriberCount);

        IntStream.range(0, 100).parallel().forEach(subscriberId -> subscribers.submit(() -> {
            try {
                var existingKeys = new HashSet<>(map.keySet());
                var size = existingKeys.size();

                //Note the keys are inserted by publisher in sequential order.
                // Hence, existing keys values should have all keys from range 0 to size-1
                // This is where weak consistency shows up.
                var missingKeys = IntStream.range(0, size).filter(num -> !existingKeys.contains("key-" + num))
                        .sorted()
                        .boxed()
                        .toList();

                if (!missingKeys.isEmpty()) {
                    var sortedExistingKeys = existingKeys.stream()
                            .sorted((k1, k2) -> Integer.parseInt(k1.split("-")[1]) - Integer.parseInt(k2.split("-")[1]))
                            .toList();
                    var start = sortedExistingKeys.get(0);
                    var end = sortedExistingKeys.get(sortedExistingKeys.size() - 1);
                    var missingStart = missingKeys.get(0);
                    var missingEnd = missingKeys.get(missingKeys.size() - 1);
                    throw new RuntimeException(String.format("Subscriber %d missing %d keys! Result key range is [ %s, %s]. However, many numbers in range [ %s, %s ] are missing", subscriberId, missingKeys.size(), start, end, missingStart, missingEnd));
                }
            } catch (Exception ex) {
                ex.printStackTrace();
                System.exit(-1);
            } finally {
                subsLatch.countDown();
                System.out.printf("Subscriber %d finished reading from map.n", subscriberId);
            }
        }));


        subsLatch.await();
        publisher.shutdownNow();
        subscribers.shutdownNow();
    }

    public static void testWithLocks() throws InterruptedException {
        var map = new ConcurrentHashMap<String, Integer>();

        var publisher = Executors.newSingleThreadExecutor();
        var totalMessages = 1_000_000;
        var subscriberActive = new AtomicBoolean(false);
        publisher.submit(() -> {
            for (int i = 0; i < totalMessages; i++) {
                var key = "key-" + i;
                // get the subscriber lock
                while (!subscriberActive.compareAndSet(false, true)) ;
                map.put(key, i);
                subscriberActive.compareAndSet(true, false);
                if (i % 10000 == 0) {
                    System.out.printf("Published %d messages.n", i);
                }
            }
            System.out.printf("Published all %d messagesn", totalMessages);
        });

        var subscriberCount = 100;
        var subscribers = Executors.newFixedThreadPool(10);
        var subsLatch = new CountDownLatch(subscriberCount);

        IntStream.range(0, 100).parallel().forEach(subscriberId -> subscribers.submit(() -> {
            try {
                while (!subscriberActive.compareAndSet(false, true)) ;
                var existingKeys = new HashSet<>(map.keySet());
                subscriberActive.compareAndSet(true, false);

                var size = existingKeys.size();

                //Note the keys are inserted by publisher in sequential order.
                // Hence, existing keys values should have all keys from range 0 to size-1
                // This is where weak consistency shows up.
                var missingKeys = IntStream.range(0, size).filter(num -> !existingKeys.contains("key-" + num))
                        .sorted()
                        .mapToObj(i -> Integer.valueOf(i))
                        .collect(Collectors.toList());

                if (!missingKeys.isEmpty()) {
                    var sortedExistingKeys = existingKeys.stream()
                            .sorted((k1, k2) -> Integer.parseInt(k1.split("-")[1]) - Integer.parseInt(k2.split("-")[1]))
                            .toList();
                    var start = sortedExistingKeys.get(0);
                    var end = sortedExistingKeys.get(sortedExistingKeys.size() - 1);
                    var missingStart = missingKeys.get(0);
                    var missingEnd = missingKeys.get(missingKeys.size() - 1);
                    throw new RuntimeException(String.format("Subscriber %d missing %d keys! Result key range is [ %s, %s]. However, many numbers in range [ %s, %s ] are missing", subscriberId, missingKeys.size(), start, end, missingStart, missingEnd));
                }
            } catch (Exception ex) {
                ex.printStackTrace();
                System.exit(-1);
            } finally {
                subsLatch.countDown();
                System.out.printf("Subscriber %d finished reading from map.n", subscriberId);
            }
        }));


        subsLatch.await();
        publisher.shutdownNow();
        subscribers.shutdownNow();
    }
}

It appears that in ConcurrenHashMap, when concurrent writes and full reads of the map are done, then the read returns results which are often missing keys from the past. I was expecting that it would provide a snapshot of the map at a point of time. However that does not seem to be the result based on the above test code.

New contributor

telu is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.

Trang chủ Giới thiệu Sinh nhật bé trai Sinh nhật bé gái Tổ chức sự kiện Biểu diễn giải trí Dịch vụ khác Trang trí tiệc cưới Tổ chức khai trương Tư vấn dịch vụ Thư viện ảnh Tin tức - sự kiện Liên hệ Chú hề sinh nhật Trang trí YEAR END PARTY công ty Trang trí tất niên cuối năm Trang trí tất niên xu hướng mới nhất Trang trí sinh nhật bé trai Hải Đăng Trang trí sinh nhật bé Khánh Vân Trang trí sinh nhật Bích Ngân Trang trí sinh nhật bé Thanh Trang Thuê ông già Noel phát quà Biểu diễn xiếc khỉ Xiếc quay đĩa Dịch vụ tổ chức sự kiện 5 sao Thông tin về chúng tôi Dịch vụ sinh nhật bé trai Dịch vụ sinh nhật bé gái Sự kiện trọn gói Các tiết mục giải trí Dịch vụ bổ trợ Tiệc cưới sang trọng Dịch vụ khai trương Tư vấn tổ chức sự kiện Hình ảnh sự kiện Cập nhật tin tức Liên hệ ngay Thuê chú hề chuyên nghiệp Tiệc tất niên cho công ty Trang trí tiệc cuối năm Tiệc tất niên độc đáo Sinh nhật bé Hải Đăng Sinh nhật đáng yêu bé Khánh Vân Sinh nhật sang trọng Bích Ngân Tiệc sinh nhật bé Thanh Trang Dịch vụ ông già Noel Xiếc thú vui nhộn Biểu diễn xiếc quay đĩa Dịch vụ tổ chức tiệc uy tín Khám phá dịch vụ của chúng tôi Tiệc sinh nhật cho bé trai Trang trí tiệc cho bé gái Gói sự kiện chuyên nghiệp Chương trình giải trí hấp dẫn Dịch vụ hỗ trợ sự kiện Trang trí tiệc cưới đẹp Khởi đầu thành công với khai trương Chuyên gia tư vấn sự kiện Xem ảnh các sự kiện đẹp Tin mới về sự kiện Kết nối với đội ngũ chuyên gia Chú hề vui nhộn cho tiệc sinh nhật Ý tưởng tiệc cuối năm Tất niên độc đáo Trang trí tiệc hiện đại Tổ chức sinh nhật cho Hải Đăng Sinh nhật độc quyền Khánh Vân Phong cách tiệc Bích Ngân Trang trí tiệc bé Thanh Trang Thuê dịch vụ ông già Noel chuyên nghiệp Xem xiếc khỉ đặc sắc Xiếc quay đĩa thú vị
Trang chủ Giới thiệu Sinh nhật bé trai Sinh nhật bé gái Tổ chức sự kiện Biểu diễn giải trí Dịch vụ khác Trang trí tiệc cưới Tổ chức khai trương Tư vấn dịch vụ Thư viện ảnh Tin tức - sự kiện Liên hệ Chú hề sinh nhật Trang trí YEAR END PARTY công ty Trang trí tất niên cuối năm Trang trí tất niên xu hướng mới nhất Trang trí sinh nhật bé trai Hải Đăng Trang trí sinh nhật bé Khánh Vân Trang trí sinh nhật Bích Ngân Trang trí sinh nhật bé Thanh Trang Thuê ông già Noel phát quà Biểu diễn xiếc khỉ Xiếc quay đĩa
Thiết kế website Thiết kế website Thiết kế website Cách kháng tài khoản quảng cáo Mua bán Fanpage Facebook Dịch vụ SEO Tổ chức sinh nhật