I need to update set of documents in a Collection. It will be updated with new fields & I want to add the updated documents to a new collection for auditing purposes. However since we cannot have two merge operations in a pipeline I’m hoping to do it in the following manner.
Are there any alternatives or better approaches for this. Since when using this method it returns all the documents of the collection regardless of filtering in prior stages of pipeline and basically I’m using a If condition to filter it out.
<code>public static void main(String[] args) {
String uri = "mongodb://localhost:27017/?retryWrites=false";
ServerApi serverApi = ServerApi.builder()
.version(ServerApiVersion.V1)
.build();
MongoClientSettings settings = MongoClientSettings.builder()
.applyConnectionString(new ConnectionString(uri))
.serverApi(serverApi)
.build();
try (MongoClient client = MongoClients.create(settings)) {
MongoDatabase database = client.getDatabase("myDB");
MongoCollection<Document> collection = database.getCollection("myCollection1");
MongoCollection<Document> auditCollection = database.getCollection("auditCollection");
// Created with Studio 3T, the IDE for MongoDB - https://studio3t.com/
Consumer<Document> processBlock = new Consumer<Document>() {
@Override
public void accept(Document doc) {
if (doc.containsKey("ack")) {
auditCollection.insertOne(new Document("_id", doc.get("_id"))
.append("title", doc.get("title")));
}
}
};
List<? extends Bson> pipeline = Arrays.asList(
new Document()
.append("$match", new Document()
.append("type", "city")
),
new Document()
.append("$lookup", new Document()
.append("from", "country")
.append("localField", "cId")
.append("foreignField", "_id")
.append("as", "countryData")
),
new Document()
.append("$unwind", new Document()
.append("path", "$countryData")
.append("preserveNullAndEmptyArrays", true)
),
new Document()
.append("$set", new Document()
.append("data.desc", "$countryData.description")
.append("ack", false)
),
new Document()
.append("$project", new Document()
.append("description", 1)
),
new Document()
.append("$merge", new Document()
.append("into", "myCollection1")
.append("on", "_id")
.append("whenMatched", "merge")
));
collection.aggregate(pipeline)
.allowDiskUse(true)
.forEach(processBlock);
} catch (MongoException e) {
System.out.println(e.getMessage());
}
}
</code>
<code>public static void main(String[] args) {
String uri = "mongodb://localhost:27017/?retryWrites=false";
ServerApi serverApi = ServerApi.builder()
.version(ServerApiVersion.V1)
.build();
MongoClientSettings settings = MongoClientSettings.builder()
.applyConnectionString(new ConnectionString(uri))
.serverApi(serverApi)
.build();
try (MongoClient client = MongoClients.create(settings)) {
MongoDatabase database = client.getDatabase("myDB");
MongoCollection<Document> collection = database.getCollection("myCollection1");
MongoCollection<Document> auditCollection = database.getCollection("auditCollection");
// Created with Studio 3T, the IDE for MongoDB - https://studio3t.com/
Consumer<Document> processBlock = new Consumer<Document>() {
@Override
public void accept(Document doc) {
if (doc.containsKey("ack")) {
auditCollection.insertOne(new Document("_id", doc.get("_id"))
.append("title", doc.get("title")));
}
}
};
List<? extends Bson> pipeline = Arrays.asList(
new Document()
.append("$match", new Document()
.append("type", "city")
),
new Document()
.append("$lookup", new Document()
.append("from", "country")
.append("localField", "cId")
.append("foreignField", "_id")
.append("as", "countryData")
),
new Document()
.append("$unwind", new Document()
.append("path", "$countryData")
.append("preserveNullAndEmptyArrays", true)
),
new Document()
.append("$set", new Document()
.append("data.desc", "$countryData.description")
.append("ack", false)
),
new Document()
.append("$project", new Document()
.append("description", 1)
),
new Document()
.append("$merge", new Document()
.append("into", "myCollection1")
.append("on", "_id")
.append("whenMatched", "merge")
));
collection.aggregate(pipeline)
.allowDiskUse(true)
.forEach(processBlock);
} catch (MongoException e) {
System.out.println(e.getMessage());
}
}
</code>
public static void main(String[] args) {
String uri = "mongodb://localhost:27017/?retryWrites=false";
ServerApi serverApi = ServerApi.builder()
.version(ServerApiVersion.V1)
.build();
MongoClientSettings settings = MongoClientSettings.builder()
.applyConnectionString(new ConnectionString(uri))
.serverApi(serverApi)
.build();
try (MongoClient client = MongoClients.create(settings)) {
MongoDatabase database = client.getDatabase("myDB");
MongoCollection<Document> collection = database.getCollection("myCollection1");
MongoCollection<Document> auditCollection = database.getCollection("auditCollection");
// Created with Studio 3T, the IDE for MongoDB - https://studio3t.com/
Consumer<Document> processBlock = new Consumer<Document>() {
@Override
public void accept(Document doc) {
if (doc.containsKey("ack")) {
auditCollection.insertOne(new Document("_id", doc.get("_id"))
.append("title", doc.get("title")));
}
}
};
List<? extends Bson> pipeline = Arrays.asList(
new Document()
.append("$match", new Document()
.append("type", "city")
),
new Document()
.append("$lookup", new Document()
.append("from", "country")
.append("localField", "cId")
.append("foreignField", "_id")
.append("as", "countryData")
),
new Document()
.append("$unwind", new Document()
.append("path", "$countryData")
.append("preserveNullAndEmptyArrays", true)
),
new Document()
.append("$set", new Document()
.append("data.desc", "$countryData.description")
.append("ack", false)
),
new Document()
.append("$project", new Document()
.append("description", 1)
),
new Document()
.append("$merge", new Document()
.append("into", "myCollection1")
.append("on", "_id")
.append("whenMatched", "merge")
));
collection.aggregate(pipeline)
.allowDiskUse(true)
.forEach(processBlock);
} catch (MongoException e) {
System.out.println(e.getMessage());
}
}