For a project, We are trying to improve the existing performance by upgrading to mongo reactive driver from mongo sync drivers. When we upgraded to reactive drivers, the performance improvement for negligible. To verify any performance gap, I had written a code to push 30K documents ( document size ~256KB), the performance difference between the two driver’s were almost same. What can be done to improve the performance while upgrading to reactive drivers ?
Reactive Code :
Driver version :
org.mongodb
mongodb-driver-reactivestreams
5.0.1
import com.mongodb.bulk.BulkWriteResult;
import com.mongodb.client.model.InsertOneModel;
import com.mongodb.client.model.WriteModel;
import com.mongodb.reactivestreams.client.MongoClient;
import com.mongodb.reactivestreams.client.MongoClients;
import com.mongodb.reactivestreams.client.MongoCollection;
import com.mongodb.reactivestreams.client.MongoDatabase;
import org.bson.Document;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.*;
public class MongoDBExample2 {
private static final Logger logger = LoggerFactory.getLogger(MongoDBExample2.class);
private static String str;
static {
try {
str = new String(Files.readAllBytes(Paths.get("<path-to-file-with-content>")));
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
MongoClient mongoClient = MongoClients.create("<connection-string>");
// Get a database and collection
MongoDatabase database = mongoClient.getDatabase("testdb");
MongoCollection<Document> collection = database.getCollection("mock1");
List<WriteModel<Document>> documents = getMockData();
CompletableFuture<BulkWriteResult> futureResult = bulkWriteDocuments(collection, documents);
futureResult.thenAccept(bulkWriteResult -> {
logger.debug("Bulk write completed: {}", bulkWriteResult);
}).exceptionally(throwable -> {
logger.error("Error occurred during bulk write: ", throwable);
return null;
});
futureResult.get();
mongoClient.close();
}
private static List<WriteModel<Document>> getMockData() {
List<WriteModel<Document>> docs = new ArrayList<>();
List<Document> documents = generateLargeDocument(30_000);
for (Document doc : documents) {
docs.add(new InsertOneModel<>(doc));
}
return docs;
}
public static CompletableFuture<BulkWriteResult> bulkWriteDocuments(MongoCollection<Document> collection, List<WriteModel<Document>> documents) {
CompletableFuture<BulkWriteResult> future = new CompletableFuture<>();
Instant start = Instant.now();
collection.bulkWrite(documents).subscribe(new Subscriber<BulkWriteResult>() {
private Subscription subscription;
@Override
public void onSubscribe(Subscription s) {
this.subscription = s;
subscription.request(1);
}
@Override
public void onNext(BulkWriteResult bulkWriteResult) {
System.out.println(" Time taken : " + Duration.between(start, Instant.now()).getSeconds());
future.complete(bulkWriteResult);
}
@Override
public void onError(Throwable throwable) {
future.completeExceptionally(throwable);
}
@Override
public void onComplete() {
if (!future.isDone()) {
future.completeExceptionally(new RuntimeException("Bulk write operation completed without emitting a result"));
}
}
});
return future;
}
private static List<Document> generateLargeDocument(int numDocuments) {
List<Document> documents = new ArrayList<>(numDocuments);
Instant op = Instant.now();
for (int i = 0; i < numDocuments; i++) {
Document document = new Document();
document.append("Idd", UUID.randomUUID().toString())
.append("content", str);
documents.add(document);
}
System.out.println("Time taken to generate docs : " + Duration.between(op, Instant.now()).getSeconds());
return documents;
}
}
Sync Code:
Driver version:
org.mongodb
mongodb-driver
3.10.0
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.InsertOneModel;
import com.mongodb.client.model.WriteModel;
import org.bson.Document;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
public class MongoDBExample {
private static final Logger logger = LoggerFactory.getLogger(MongoDBExample.class);
private static String str;
static {
try {
str = new String(Files.readAllBytes(Paths.get("<path-to-file-with-content>")));
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public static void main(String[] args) {
MongoClient mongoClient = MongoClients.create("<connection-string>");
MongoDatabase database = mongoClient.getDatabase("testdb");
MongoCollection<Document> collection = database.getCollection("mock1");
List<WriteModel<Document>> documents = getMockData();
Instant start = Instant.now();
collection.bulkWrite(documents);
System.out.println(
"Time taken to generate docs : " + Duration.between(start, Instant.now()).getSeconds());
mongoClient.close();
}
private static List<WriteModel<Document>> getMockData() {
List<WriteModel<Document>> docs = new ArrayList<>();
List<Document> documents = generateLargeDocument(30_000);
for (Document doc : documents) {
docs.add(new InsertOneModel<>(doc));
}
return docs;
}
private static List<Document> generateLargeDocument(int numDocuments) {
List<Document> documents = new ArrayList<>(numDocuments);
Instant op = Instant.now();
for (int i = 0; i < numDocuments; i++) {
Document document = new Document();
document.append("Idd", UUID.randomUUID().toString())
.append("content", str);
documents.add(document);
}
System.out.println("Time taken to generate docs : " + Duration.between(op, Instant.now()).getSeconds());
return documents;
}
}
Avg Time Taken by Reactive drivers to push 30K records : 2229 seconds
Avg Time Taken by Sync drivers to push 30K records : 2259 seconds
How to integrate/upgrade to mongoDb reactive library to witness performance improvement, Is there any specific way to implement, work around with Java Threads or something else.
In our project, we process approximately 10K insert, update and delete operations per second, we would like to improve this metric. We are mostly using the bulk write function of the sync drivers currently.
Abhijeet Singh is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.