I want to implement concurrent map in C++ with the following requirements:
static_assert
at the beginning of the class indicates that in this task it is assumed that only integer numbers can be keys of ConcurrentMap.
The constructor of the class ConcurrentMap<K, V>
takes the number of sub-dictionaries into which the entire key space should be divided.
operator[]
should behave the same way as the analogous operator in map
– if the key key
is present in the dictionary, it should return an object of the Access
class containing a reference to its corresponding value; if key
is absent in the dictionary, a pair (key, V())
should be added to it, and an object of the Access
class containing a reference to the newly added value should be returned.
The structure Access
should behave the same way as in the Synchronized
template – providing a reference to the dictionary value and ensuring synchronization of access to it.
The BuildOrdinaryMap
method should merge together parts of the dictionary and return the entire dictionary as a whole. At the same time, it should be thread-safe, i.e., work correctly when other threads perform operations on ConcurrentMap
.
My implementation:
#include <algorithm>
#include <numeric>
#include <vector>
#include <string>
#include <random>
#include <mutex>
#include <future>
#include <map>
#include <utility>
using namespace std;
template <typename K, typename V>
class ConcurrentMap {
public:
static_assert(is_integral_v<K>, "ConcurrentMap supports only integer keys");
struct Access {
Access(V& value, mutex& m) : ref_to_value(value), guard(m){};
V& ref_to_value;
lock_guard<mutex> guard;
};
explicit ConcurrentMap(size_t bucket_count) : current_thread(bucket_count - 1), bucket_count(bucket_count),conc_map(bucket_count) {};
Access operator[](const K& key){
if(keys_threads.count(key) > 0){
size_t thread_num = keys_threads[key];
return {conc_map[thread_num].second.at(key),conc_map[thread_num].first};
} else{
current_thread = (current_thread == bucket_count - 1) ? 0 : current_thread + 1;
conc_map[current_thread].second[key] = V();
keys_threads[key] = current_thread;
return {conc_map[current_thread].second.at(key),conc_map[current_thread].first};
}
};
map<K, V> BuildOrdinaryMap(){
lock_guard<mutex> guard_map(ordinary_map_mutex);
map<K,V> ordinary_map;
for(auto& thread_map:conc_map){
ordinary_map.insert(thread_map.second.begin(),thread_map.second.end());
}
return ordinary_map;
};
private:
vector<pair<mutex,map<K,V>>> conc_map;
map<K,size_t> keys_threads;
size_t current_thread;
size_t bucket_count;
mutex ordinary_map_mutex;
};
My ide: I will create a vector of the size of the number of buckets/threads; each element of the vector is a pair of mutex and map, where each mutex will protect corresponding map.
Additional explanation to the operator[]
- if key is present (checked by
.count
method inkeys_threads
map), then I retrieve a bucket (number) in which key was saved (size_t thread_num = keys_threads[key];
) and then return anAccess
object, which will give an access to the value of the map under mutex ({conc_map[thread_num].second.at(key),conc_map[thread_num].first};
) - if key is absent: I calculate a bucket/thread, where new element will be saved (if current_thread is equal to the bucket_count – 1, then I reset it to 0, else I increase it by 1), then I add empty element to the bucket (
conc_map[current_thread].second[key] = V();
), then I add key, thread pair to thekeys_threads
map and return anAccess
object, which will give an access to the new value of the map under mutex ({conc_map[current_thread].second.at(key),conc_map[current_thread].first};
)
I run couple of tests, but unfortunately mutex doesn’t protect against simultaneous changes. Can you please give me a hint, where I made mistake/mistakes (I can assume it has to do with thread calculation, but I am not sure).
Tests:
void RunConcurrentUpdates(
ConcurrentMap<int, int>& cm, size_t thread_count, int key_count
) {
auto kernel = [&cm, key_count](int seed) {
vector<int> updates(key_count);
iota(begin(updates), end(updates), -key_count / 2);
shuffle(begin(updates), end(updates), default_random_engine(seed));
for (int i = 0; i < 2; ++i) {
for (auto key : updates) {
cm[key].ref_to_value++;
}
}
};
vector<future<void>> futures;
for (size_t i = 0; i < thread_count; ++i) {
futures.push_back(async(kernel, i));
}
}
void TestConcurrentUpdate() {
const size_t thread_count = 3;
const size_t key_count = 50000;
ConcurrentMap<int, int> cm(thread_count);
RunConcurrentUpdates(cm, thread_count, key_count);
const auto result = cm.BuildOrdinaryMap();
ASSERT_EQUAL(result.size(), key_count);
for (auto& [k, v] : result) {
AssertEqual(v, 6, "Key = " + to_string(k));
}
}
void TestReadAndWrite() {
ConcurrentMap<size_t, string> cm(5);
auto updater = [&cm] {
for (size_t i = 0; i < 50000; ++i) {
cm[i].ref_to_value += 'a';
}
};
auto reader = [&cm] {
vector<string> result(50000);
for (size_t i = 0; i < result.size(); ++i) {
result[i] = cm[i].ref_to_value;
}
return result;
};
auto u1 = async(updater);
auto r1 = async(reader);
auto u2 = async(updater);
auto r2 = async(reader);
u1.get();
u2.get();
for (auto f : {&r1, &r2}) {
auto result = f->get();
ASSERT(all_of(result.begin(), result.end(), [](const string& s) {
return s.empty() || s == "a" || s == "aa";
}));
}
}
where AssertEqual
and ASSERT_EQUAL are custom classes/functions.