I’m using MongoDB as a persistent database and Ignite as a cache, and I have enabled read-through and write-through. Here is part of my code.
public class MongoDBCacheStore<K, V> implements CacheStore<K, V> {
/** Buffer to store mutations performed withing transaction. */
//private static final String TRANSACTION_BUFFER = "CASSANDRA_TRANSACTION_BUFFER";
/**
* Thread name.
*/
private static final String CACHE_LOADER_THREAD_NAME = "mongo-cache-loader";
/**
* Auto-injected ignite instance.
*/
@SuppressWarnings("unused")
@IgniteInstanceResource
private Ignite ignite;
/**
* Auto-injected store session.
*/
@SuppressWarnings("unused")
@CacheStoreSessionResource
private CacheStoreSession storeSes;
@LoggerResource
private IgniteLogger log;
private final PersistenceController controller;
UpdateOptions updateOptions = new UpdateOptions().upsert(true);
BulkWriteOptions bulkWriteOptions = new BulkWriteOptions().ordered(true);
//rivate MongoCollection<BsonDocument> curCollection;
public MongoDBCacheStore(KeyValuePersistenceSettings settings) {
this.controller = new PersistenceController(settings);
//collection = getMongoCollection();
}
@Override
public void loadCache(IgniteBiInClosure<K, V> clo, Object... args) throws
CacheLoaderException {
if (clo == null) {
return;
}
}
@Override
public V load(final K key) throws CacheLoaderException {
if (null == key) {
log.error("MongoDBCacheStore load key=null");
return null;
}
var collection = getCurCollection();
if (collection == null) {
log.error("MongoDBCacheStore load collection=null key=" + key.toString());
return null;
}
//long id = Long.parseLong((String) key);
var transKey = IgniteMongoDBHelper.INSTANCE.convertKey(key, controller.getKeyType());
BsonDocument bsonDoc = collection.find(eq(controller.getKeyName(), transKey)).first();
if (null == bsonDoc) {
log.error("MongoDBCacheStore load bsonDoc=null collection=" + controller.getCollectionName() +
"key=" + transKey.toString());
return null;
}
String json = bsonDoc.toJson();
RawBsonDocument rawBson = RawBsonDocument.parse(json);
ByteBuf buffer = rawBson.getByteBuffer();
return (V) buffer.array();
}
@SuppressWarnings({"unchecked"})
@Override
public Map<K, V> loadAll(Iterable<? extends K> keys) {
assert keys != null;
Map<K, V> loaded = new HashMap<>();
for (K key : keys) {
V v = load(key);
if (v != null)
loaded.put(key, v);
}
return loaded;
}
@Override
public void write(final Cache.Entry<? extends K, ? extends V> entry) throws CacheWriterException {
if (null == entry || null == entry.getValue()) {
log.error("MongoDBCacheStore write key=null");
return;
}
var collection = getCurCollection();
if (collection == null) {
return;
}
var key = IgniteMongoDBHelper.INSTANCE.convertKey(entry.getKey(), controller.getKeyType());
//long key = Long.parseLong((String) entry.getKey());
RawBsonDocument rawBson = new RawBsonDocument((byte[]) entry.getValue());
try {
collection.updateOne(eq(controller.getKeyName(), key), rawBson,
updateOptions);
} catch (MongoException me) {
log.error("Unable to updateOne key=" + entry.getKey() + " due to an error: " + me);
}
}
@Override
public void writeAll(Collection<Cache.Entry<? extends K, ? extends V>> entries)
throws CacheWriterException {
if (entries == null || entries.isEmpty()) {
log.error("A MongoDBCacheStore writeAll entries == null || entries.isEmpty()");
return;
}
var collection = getCurCollection();
if (collection == null) {
log.error("A MongoDBCacheStore writeAll collection == null");
return;
}
try {
List<WriteModel<RawBsonDocument>> requests = new ArrayList<>();
for (Cache.Entry<?, ?> entry : entries) {
RawBsonDocument rawBson = new RawBsonDocument(
(byte[]) entry.getValue());
//var key = (K) entry.getKey();
var key = IgniteMongoDBHelper.INSTANCE.convertKey(entry.getKey(), controller.getKeyType());
UpdateOneModel<RawBsonDocument> update = new UpdateOneModel<>(
eq(controller.getKeyName(), key),
rawBson, updateOptions);
requests.add(update);
//System.out.println("--------rawBson-------->" + rawBson.toJson());
log.error(rawBson.toJson());
}
collection.bulkWrite(requests, bulkWriteOptions);
} catch (MongoBulkWriteException e) {
log.error("A MongoBulkWriteException occured with the following message: " + e.getMessage());
}
}
@Override
public void delete(Object key) throws CacheWriterException {
if (null == key) {
log.error("A MongoDBCacheStore delete key == null");
return;
}
Long longKey = Long.parseLong(String.valueOf(key));
var collection = getCurCollection();
if (collection == null) {
log.error("A MongoDBCacheStore delete collection == null");
return;
}
try {
collection.deleteOne(eq(controller.getKeyName(), longKey));
} catch (MongoException me) {
log.error("Unable to delete due to an error: " + me + "database =" +
controller.getDataBaseName() + " collection = " + controller.getCollectionName() +
"keyName = " + controller.getKeyName());
}
}
@Override
public void deleteAll(Collection<?> keys) throws CacheWriterException {
if (keys == null || keys.isEmpty()) {
log.error("A MongoDBCacheStore deleteAll key == null");
return;
}
var collection = getCurCollection();
if (collection == null) {
log.error("A MongoDBCacheStore deleteAll collection == null");
return;
}
try {
List<WriteModel<RawBsonDocument>> requests = new ArrayList<>();
for (Object key : keys) {
//long longKey = Long.parseLong((String) key);
DeleteOneModel<RawBsonDocument> deleteOne = new DeleteOneModel<>(
eq(controller.getKeyName(), key));
requests.add(deleteOne);
}
collection.bulkWrite(requests);
} catch (MongoBulkWriteException e) {
log.error("Unable to delete due to an error: " + e + "database =" +
controller.getDataBaseName() + " collection = " + controller.getCollectionName() +
"keyName = " + controller.getKeyName());
}
}
private MongoCollection<BsonDocument> getCurCollection() {
MongoClient mongoClient = MongoDBManager.INSTANCE.getMongoClient();
if (mongoClient == null) {
log.error("A MongoDBCacheStore getCurCollection mongoClient == null");
return null;
}
MongoDatabase database = mongoClient.getDatabase(controller.getDataBaseName());
if (database == null) {
log.error("A MongoDBCacheStore getCurCollection database == null databaseName ="
+ controller.getDataBaseName());
return null;
}
return database.getCollection(controller.getCollectionName(), BsonDocument.class);
}
@Override
public void sessionEnd(boolean commit) throws CacheWriterException {
if (!storeSes.isWithinTransaction())
return;
}}
Sometimes some data in Ignite cannot be persisted to MongoDB, while some data can. To rule out the interference of write-behind, I turned off write-behind. And I’m sure it’s not a MongoDB storage failure (by querying the relevant MongoDB logs), why is this?
What could be the possible reasons for this issue? How can I pinpoint them?
2