mongodb的变更流解释:
变更流(Change Streams)允许应用程序访问实时数据变更,从而避免事先手动追踪 oplog 的复杂性和风险。应用程序可使用变更流来订阅针对单个集合、数据库或整个部署的所有数据变更,并立即对它们做出响应。由于变更流采用聚合框架,因此,应用程序还可对特定变更进行过滤,或是随意转换通知。(Change Streams - MongoDB Manual v5.0)
使用场景,需要websocket推送实时数据的时候,我们把数据写入mongo的同时,websocket实时监听mongo数据,拿到后推送到订阅组用户。
这里只做一端新增另一端服务监听测试,及windows下副本集快速搭建流程。
sub端代码
package main import ( "context" "fmt" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" "log" ) func main() { // 设置 MongoDB 客户端mongo单机模式不支持这种监听 单机报错 2024/08/10 11:18:54 (Location40573) The $changeStream stage is only supported on replica sets clientOptions := options.Client().ApplyURI("mongodb://localhost:27017") client, err := mongo.Connect(context.TODO(), clientOptions) if err != nil { log.Fatal(err) } defer client.Disconnect(context.TODO()) // 获取数据库和集合 collection := client.Database("testdb").Collection("items") // 设置 Change Stream pipeline := mongo.Pipeline{} changeStreamOptions := options.ChangeStream().SetFullDocument(options.UpdateLookup) changeStream, err := collection.Watch(context.TODO(), pipeline, changeStreamOptions) if err != nil { log.Fatal(err) } defer changeStream.Close(context.TODO()) fmt.Println("开始监听 Change Stream...") // 读取 Change Stream for changeStream.Next(context.TODO()) { var changeEvent bson.M if err := changeStream.Decode(&changeEvent); err != nil { log.Fatal(err) } fmt.Printf("检测到更改: %+v\n", changeEvent) } if err := changeStream.Err(); err != nil { log.Fatal(err) } }
pub端代码
package main import ( "context" "fmt" "time" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" ) func main() { // 设置 MongoDB 客户端 clientOptions := options.Client().ApplyURI("mongodb://localhost:27017") client, err := mongo.Connect(context.TODO(), clientOptions) if err != nil { fmt.Println("连接 MongoDB 失败:", err) return } defer client.Disconnect(context.TODO()) // 获取数据库和集合 collection := client.Database("testdb").Collection("items") // 插入数据 for i := 1; i <= 5; i++ { item := bson.D{{"name", fmt.Sprintf("item%d", i)}, {"value", i}} _, err := collection.InsertOne(context.TODO(), item) if err != nil { fmt.Println("插入数据失败:", err) return } //fmt.Printf("插入数据: %+v\n", item) fmt.Printf("插入数据第 %d 条", i) time.Sleep(2 * time.Second) // 模拟一些延迟 } }
执行结果 pub端
执行结果 sub端
数据库不用新建集合,自动生成很方便
下面是windows下安装副本集步骤一字不拉
https://www.mongodb.com/try/download/community 下载zip包解压 bin目录同级创建data-data4(data内部需要创建好db目录),log-log4 MongoDB shell version v5.0.28 注意 data目录下没有db文件夹net start MongoDB执行服务起不来 192.168.2.6 本机ip mongod.exe --config "E:\mongodb\mongod.conf" --serviceName "MongoDB" --serviceDisplayName "MongoDB" --install mongod.exe --config "E:\mongodb\mongod1.conf" --serviceName "MongoDB1" --serviceDisplayName "MongoDB1" --install mongod.exe --config "E:\mongodb\mongod2.conf" --serviceName "MongoDB2" --serviceDisplayName "MongoDB2" --install mongod.exe --config "E:\mongodb\mongod3.conf" --serviceName "MongoDB3" --serviceDisplayName "MongoDB3" --install net start MongoDB net start MongoDB1 net start MongoDB2 net start MongoDB3 bin目录下打开cmd执行mongo.exe rs_conf={_id:"rs", members:[ {_id:0,host:"192.168.2.6:27017",priority:1}, {_id:1,host:"192.168.2.6:27018",priority:2}, {_id:2,host:"192.168.2.6:27019",priority:3}, {_id:4,host:"192.168.2.6:27020", arbiterOnly:true} ]} 返回这个代表成功: { "_id" : "rs", "members" : [ { "_id" : 0, "host" : "192.168.2.6:27017", "priority" : 1 }, { "_id" : 1, "host" : "192.168.2.6:27018", "priority" : 2 }, { "_id" : 2, "host" : "192.168.2.6:27019", "priority" : 3 }, { "_id" : 4, "host" : "192.168.2.6:27020", "arbiterOnly" : true } ] } rs.initiate(rs_conf) 执行配置 {"ok":1} rs.status() 查看状态 { "set" : "rs", "date" : ISODate("2024-08-10T02:40:20.391Z"), "myState" : 2, "term" : NumberLong(2), "syncSourceHost" : "192.168.2.6:27019", "syncSourceId" : 2, "heartbeatIntervalMillis" : NumberLong(2000), "majorityVoteCount" : 3, "writeMajorityCount" : 3, "votingMembersCount" : 4, "writableVotingMembersCount" : 3, "optimes" : { "lastCommittedOpTime" : { "ts" : Timestamp(1723257616, 1), "t" : NumberLong(2) }, "lastCommittedWallTime" : ISODate("2024-08-10T02:40:16.003Z"), "readConcernMajorityOpTime" : { "ts" : Timestamp(1723257616, 1), "t" : NumberLong(2) }, "appliedOpTime" : { "ts" : Timestamp(1723257616, 1), "t" : NumberLong(2) }, "durableOpTime" : { "ts" : Timestamp(1723257616, 1), "t" : NumberLong(2) }, "lastAppliedWallTime" : ISODate("2024-08-10T02:40:16.003Z"), "lastDurableWallTime" : ISODate("2024-08-10T02:40:16.003Z") }, "lastStableRecoveryTimestamp" : Timestamp(1723257586, 1), "electionParticipantMetrics" : { "votedForCandidate" : true, "electionTerm" : NumberLong(2), "lastVoteDate" : ISODate("2024-08-10T02:39:15.909Z"), "electionCandidateMemberId" : 2, "voteReason" : "", "lastAppliedOpTimeAtElection" : { "ts" : Timestamp(1723257547, 5), "t" : NumberLong(1) }, "maxAppliedOpTimeInSet" : { "ts" : Timestamp(1723257547, 5), "t" : NumberLong(1) }, "priorityAtElection" : 1, "newTermStartDate" : ISODate("2024-08-10T02:39:15.997Z"), "newTermAppliedDate" : ISODate("2024-08-10T02:39:16.928Z") }, "members" : [ { "_id" : 0, "name" : "192.168.2.6:27017", "health" : 1, "state" : 2, "stateStr" : "SECONDARY", "uptime" : 2677, "optime" : { "ts" : Timestamp(1723257616, 1), "t" : NumberLong(2) }, "optimeDate" : ISODate("2024-08-10T02:40:16Z"), "lastAppliedWallTime" : ISODate("2024-08-10T02:40:16.003Z"), "lastDurableWallTime" : ISODate("2024-08-10T02:40:16.003Z"), "syncSourceHost" : "192.168.2.6:27019", "syncSourceId" : 2, "infoMessage" : "", "configVersion" : 1, "configTerm" : 2, "self" : true, "lastHeartbeatMessage" : "" }, { "_id" : 1, "name" : "192.168.2.6:27018", "health" : 1, "state" : 2, "stateStr" : "SECONDARY", "uptime" : 85, "optime" : { "ts" : Timestamp(1723257616, 1), "t" : NumberLong(2) }, "optimeDurable" : { "ts" : Timestamp(1723257616, 1), "t" : NumberLong(2) }, "optimeDate" : ISODate("2024-08-10T02:40:16Z"), "optimeDurableDate" : ISODate("2024-08-10T02:40:16Z"), "lastAppliedWallTime" : ISODate("2024-08-10T02:40:16.003Z"), "lastDurableWallTime" : ISODate("2024-08-10T02:40:16.003Z"), "lastHeartbeat" : ISODate("2024-08-10T02:40:19.059Z"), "lastHeartbeatRecv" : ISODate("2024-08-10T02:40:20.083Z"), "pingMs" : NumberLong(0), "lastHeartbeatMessage" : "", "syncSourceHost" : "192.168.2.6:27017", "syncSourceId" : 0, "infoMessage" : "", "configVersion" : 1, "configTerm" : 2 }, { "_id" : 2, "name" : "192.168.2.6:27019", "health" : 1, "state" : 1, "stateStr" : "PRIMARY", "uptime" : 85, "optime" : { "ts" : Timestamp(1723257616, 1), "t" : NumberLong(2) }, "optimeDurable" : { "ts" : Timestamp(1723257616, 1), "t" : NumberLong(2) }, "optimeDate" : ISODate("2024-08-10T02:40:16Z"), "optimeDurableDate" : ISODate("2024-08-10T02:40:16Z"), "lastAppliedWallTime" : ISODate("2024-08-10T02:40:16.003Z"), "lastDurableWallTime" : ISODate("2024-08-10T02:40:16.003Z"), "lastHeartbeat" : ISODate("2024-08-10T02:40:19.060Z"), "lastHeartbeatRecv" : ISODate("2024-08-10T02:40:20.022Z"), "pingMs" : NumberLong(0), "lastHeartbeatMessage" : "", "syncSourceHost" : "", "syncSourceId" : -1, "infoMessage" : "", "electionTime" : Timestamp(1723257555, 1), "electionDate" : ISODate("2024-08-10T02:39:15Z"), "configVersion" : 1, "configTerm" : 2 }, { "_id" : 4, "name" : "192.168.2.6:27020", "health" : 1, "state" : 7, "stateStr" : "ARBITER", "uptime" : 85, "lastHeartbeat" : ISODate("2024-08-10T02:40:19.059Z"), "lastHeartbeatRecv" : ISODate("2024-08-10T02:40:20.092Z"), "pingMs" : NumberLong(0), "lastHeartbeatMessage" : "", "syncSourceHost" : "", "syncSourceId" : -1, "infoMessage" : "", "configVersion" : 1, "configTerm" : 2 } ], "ok" : 1, "$clusterTime" : { "clusterTime" : Timestamp(1723257616, 1), "signature" : { "hash" : BinData(0,"AAAAAAAAAAAAAAAAAAAAAAAAAAA="), "keyId" : NumberLong(0) } }, "operationTime" : Timestamp(1723257616, 1) }
demo代码链接
go/mongochangestreamsdemo/demo at main · liuzhixin405/go (github.com)
mongo配置链接
config/mongo windows集群 at main · liuzhixin405/config (github.com)