I am trying to listen for the Change Stream for mongodb to a collection say UserCollection. I have tried solution available but the existing solution not working. I am using dependency : spring-boot-starter-data-mongodb having version 4.2.5. The below code is not working
// Select the MongoDB database
MongoDatabase database = mongoTemplate.getDb();
<code> // Select the collection to query
MongoCollection<Document> collection = database.getCollection("UserProfile");
// Create pipeline for operationType filter
List<Bson> pipeline = Arrays.asList(
Aggregates.match(
Filters.in(
"operationType",
Arrays.asList("insert", "update", "delete")
)));
// Create the Change Stream
ChangeStreamIterable<Document> changeStream = collection.watch(pipeline)
.fullDocument(FullDocument.UPDATE_LOOKUP);
// Iterate over the Change Stream
for (ChangeStreamDocument<Document> changeEvent : changeStream) {
// Process the change event here
switch (changeEvent.getOperationType()) {
case INSERT:
System.out.println("MongoDB Change Stream detected an insert");
break;
case UPDATE:
System.out.println("MongoDB Change Stream detected an update");
break;
case DELETE:
System.out.println("MongoDB Change Stream detected a delete");
break;
}
}
</code>
<code> // Select the collection to query
MongoCollection<Document> collection = database.getCollection("UserProfile");
// Create pipeline for operationType filter
List<Bson> pipeline = Arrays.asList(
Aggregates.match(
Filters.in(
"operationType",
Arrays.asList("insert", "update", "delete")
)));
// Create the Change Stream
ChangeStreamIterable<Document> changeStream = collection.watch(pipeline)
.fullDocument(FullDocument.UPDATE_LOOKUP);
// Iterate over the Change Stream
for (ChangeStreamDocument<Document> changeEvent : changeStream) {
// Process the change event here
switch (changeEvent.getOperationType()) {
case INSERT:
System.out.println("MongoDB Change Stream detected an insert");
break;
case UPDATE:
System.out.println("MongoDB Change Stream detected an update");
break;
case DELETE:
System.out.println("MongoDB Change Stream detected a delete");
break;
}
}
</code>
// Select the collection to query
MongoCollection<Document> collection = database.getCollection("UserProfile");
// Create pipeline for operationType filter
List<Bson> pipeline = Arrays.asList(
Aggregates.match(
Filters.in(
"operationType",
Arrays.asList("insert", "update", "delete")
)));
// Create the Change Stream
ChangeStreamIterable<Document> changeStream = collection.watch(pipeline)
.fullDocument(FullDocument.UPDATE_LOOKUP);
// Iterate over the Change Stream
for (ChangeStreamDocument<Document> changeEvent : changeStream) {
// Process the change event here
switch (changeEvent.getOperationType()) {
case INSERT:
System.out.println("MongoDB Change Stream detected an insert");
break;
case UPDATE:
System.out.println("MongoDB Change Stream detected an update");
break;
case DELETE:
System.out.println("MongoDB Change Stream detected a delete");
break;
}
}