I am trying to setup a deployment level change stream with a pipeline filter on collection name using MongoDB java SDK. Here is the code snippet.
List<Bson> pipeline = Collections.singletonList(Aggregates.match(Filters.or(
Filters.eq("namespace", "db1.c1"),
Filters.eq("namespace", "db1.c2"))));
client.watch(pipeline)
.cursor()
.forEachRemaining(doc -> {
System.out.println(doc);
});
But this query does not match any document. Following variations of the pipeline document does not work either.
List<Bson> pipeline =
Collections.singletonList(Aggregates.match(Filters.or(
Document.parse("{'namespace': 'db1.c1'}"),
Document.parse("{'namespace': 'db1.c2'}"))));
Surprisingly pipeline works on other fields of the changestream document. For example, this works:
List<Bson> pipeline = Collections
.singletonList(Aggregates.match(Filters.and(
Document.parse("{'fullDocument.seq': 4}"),
Filters.in("operationType", Arrays.asList("insert")))));
I am not sure why this would be the case. I would appreciate any help in this regard.
in Order to get changestream events on DB or collection level you need to use filters on below attributes :
ns.db The name of the database.
ns.coll The name of the collection.
In your case if you are interested only for collection C1 & c2 then consider having filter something like below.
Please Note: above filter is on collections only you can add filter on DB for collection form specific DB like (ns.db = "db1" AND ns.coll : $in :["c1", "c2"] )
MongoDB doc reference: https://docs.mongodb.com/manual/reference/change-events/#change-stream-output