MongoDB-ChangeStream使用笔记

MongoDB-ChangeStream使用笔记

一、概述

MongoDB的ChangeStreams允许应用程序实时访问数据的变化。应用程序可以使用Change Streams订阅集合上的所有数据的更改,并立即对它们作出响应。利用Change Streams这一功能可以构建实时数据同步的应用。

ChangeSteam 的功能有点类似于 Canal, 可以监听到数据库的实时变更,和 Canal 监听 binlog 类似,ChangesSteam 通过监听oplog 来跟踪数据库变更。ChangeSteam 是 mongo 自带的,不需要额外安装第三方中间件。

ChangeStreams的变化事件类型

insert事件,update事件,delete事件,replace事件,invalidate事件

官方使用文档:https://docs.mongodb.com/manual/changeStreams/

本文实践的场景为:实时监听两个集合的数据变化:实时更新写入到第三个集合中。

本次采用的方式是,MongoDB-JS脚本方式,通过shell脚本执行

二、编写监听集合脚本(watch_user1.js)

var watchCollectName = 'user1';
var writeCollectName = 'user_all';
var watchCursor_data = db.getCollection(watchCollectName).watch();
while (!watchCursor_data.isExhausted()){
	if (watchCursor_data.hasNext()) {
		var operation = watchCursor_data.next();
		if(operation.operationType == 'insert'){
		   var dataObj = db.getCollection(operation.ns.coll).findOne({_id:operation.documentKey._id});
			 db.getCollection(writeCollectName).insert(dataObj);
		}else if(operation.operationType == 'update' || operation.operationType == 'replace'){
		   var dataObj = db.getCollection(operation.ns.coll).findOne({_id:operation.documentKey._id});
			 db.getCollection(writeCollectName).replaceOne({_id:operation.documentKey._id},dataObj,{upsert:true});
		}else if(operation.operationType == 'delete'){
			 db.getCollection(writeCollectName).remove({_id:operation.documentKey._id});
		}
	}
}

三、通过shell脚本执行(watch_mongo.sh)

/db/mongodb/bin/mongo 127.0.0.1:27017/myapp -umyapp -pmypwd /db/mongodb/scripts/watch_user1.js

监听其他集合方式和此相同,需要注意的是,单个mongodb-js中不能同时监听两个集合,需要各自独立mongodb-js脚本。

shell脚本执行多个时,通过“&”符号方式使得多个能并行执行,例如:

/db/mongodb/bin/mongo 127.0.0.1:27017/myapp -umyapp -pmypwd /db/mongodb/scripts/watch_user1.js &
/db/mongodb/bin/mongo 127.0.0.1:27017/myapp -umyapp -pmypwd /db/mongodb/scripts/watch_user2.js &
/db/mongodb/bin/mongo 127.0.0.1:27017/myapp -umyapp -pmypwd /db/mongodb/scripts/watch_user3.js

四、其他语言支持

ChangeStream在如Java等其他语言中也可支持监听数据流的变化。

如Java中可以使用原生api方式,例如:

MongoDatabase database = mongoClient.getDatabase("db_abc");
MongoCollection<Document> collec = database.getCollection(collectName);
List<Bson> pipeline = java.util.Collections.singletonList(Aggregates.match(Filters.or(
                        Document.parse("{}"),
                        Filters.in("operationType", asList("insert", "update", "delete")))));
MongoCursor<ChangeStreamDocument<Document>> cursor = collec.watch(pipeline).iterator();
while (cursor.hasNext()) {
	ChangeStreamDocument<Document> next = cursor.next();
	String Operation = next.getOperationType().getValue();
    ...
}

也可以在springboot中使用MessageListener方式:

/**
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-mongodb</artifactId>
</dependency>
*/
    
public class MyObjMessageListener implements MessageListener<ChangeStreamDocument<Document>, MyObj> {
    @Override
    public void onMessage(Message<ChangeStreamDocument<Document>, MyObj> message) {
        MyObj obj = message.getBody();
        //TODO
    }
}

@Bean
MessageListenerContainer messageListenerContainer(MongoTemplate template) {
    Executor executor = Executors.newSingleThreadExecutor();
    return new DefaultMessageListenerContainer(template, executor) {
        @Override
        public boolean isAutoStartup() {
            return true;
        }
    };
}

@EventListener(ApplicationStartedEvent.class)
public void subscript() {
    MyObjMessageListener myObjMessageListener = new MyObjMessageListener();
    ChangeStreamRequest<MyObj> request = ChangeStreamRequest.builder(myObjMessageListener)
            .collection("myobj")
            .filter(newAggregation(match(where("operationType").in("insert", "update", "replace"))))
            .build();
    messageListenerContainer.register(request, MyObj.class);
}

其他参考

原文地址:https://www.cnblogs.com/huligong1234/p/14124755.html