首页 星云 工具 资源 星选 资讯 热门工具
:

PDF转图片 完全免费 小红书视频下载 无水印 抖音视频下载 无水印 数字星空

mongo变更流使用及windows下副本集五分钟搭建

编程知识
2024年08月10日 10:44

mongodb的变更流解释:

变更流(Change Streams)允许应用程序访问实时数据变更,从而避免事先手动追踪  oplog 的复杂性和风险。应用程序可使用变更流来订阅针对单个集合、数据库或整个部署的所有数据变更,并立即对它们做出响应。由于变更流采用聚合框架,因此,应用程序还可对特定变更进行过滤,或是随意转换通知。(Change Streams - MongoDB Manual v5.0)

使用场景,需要websocket推送实时数据的时候,我们把数据写入mongo的同时,websocket实时监听mongo数据,拿到后推送到订阅组用户。

这里只做一端新增另一端服务监听测试,及windows下副本集快速搭建流程。

 

sub端代码

package main

import (
	"context"
	"fmt"
	"go.mongodb.org/mongo-driver/bson"
	"go.mongodb.org/mongo-driver/mongo"
	"go.mongodb.org/mongo-driver/mongo/options"
	"log"
)

func main() {
	// 设置 MongoDB 客户端mongo单机模式不支持这种监听 单机报错 2024/08/10 11:18:54 (Location40573) The $changeStream stage is only supported on replica sets
	clientOptions := options.Client().ApplyURI("mongodb://localhost:27017")
	client, err := mongo.Connect(context.TODO(), clientOptions)
	if err != nil {
		log.Fatal(err)
	}
	defer client.Disconnect(context.TODO())

	// 获取数据库和集合
	collection := client.Database("testdb").Collection("items")

	// 设置 Change Stream
	pipeline := mongo.Pipeline{}
	changeStreamOptions := options.ChangeStream().SetFullDocument(options.UpdateLookup)
	changeStream, err := collection.Watch(context.TODO(), pipeline, changeStreamOptions)
	if err != nil {
		log.Fatal(err)
	}
	defer changeStream.Close(context.TODO())

	fmt.Println("开始监听 Change Stream...")

	// 读取 Change Stream
	for changeStream.Next(context.TODO()) {
		var changeEvent bson.M
		if err := changeStream.Decode(&changeEvent); err != nil {
			log.Fatal(err)
		}

		fmt.Printf("检测到更改: %+v\n", changeEvent)
	}

	if err := changeStream.Err(); err != nil {
		log.Fatal(err)
	}
}

 

pub端代码

package main

import (
    "context"
    "fmt"
    "time"

    "go.mongodb.org/mongo-driver/bson"
    "go.mongodb.org/mongo-driver/mongo"
    "go.mongodb.org/mongo-driver/mongo/options"
)

func main() {
    // 设置 MongoDB 客户端
    clientOptions := options.Client().ApplyURI("mongodb://localhost:27017")
    client, err := mongo.Connect(context.TODO(), clientOptions)
    if err != nil {
        fmt.Println("连接 MongoDB 失败:", err)
        return
    }
    defer client.Disconnect(context.TODO())

    // 获取数据库和集合
    collection := client.Database("testdb").Collection("items")

    // 插入数据
    for i := 1; i <= 5; i++ {
        item := bson.D{{"name", fmt.Sprintf("item%d", i)}, {"value", i}}
        _, err := collection.InsertOne(context.TODO(), item)
        if err != nil {
            fmt.Println("插入数据失败:", err)
            return
        }
        //fmt.Printf("插入数据: %+v\n", item)
        fmt.Printf("插入数据第 %d 条", i)
        time.Sleep(2 * time.Second) // 模拟一些延迟
    }
}

执行结果 pub端

 执行结果 sub端

 

数据库不用新建集合,自动生成很方便

 

 

 

下面是windows下安装副本集步骤一字不拉

https://www.mongodb.com/try/download/community  下载zip包解压 bin目录同级创建data-data4(data内部需要创建好db目录),log-log4 
MongoDB shell version v5.0.28  
注意 data目录下没有db文件夹net start MongoDB执行服务起不来   192.168.2.6  本机ip
mongod.exe --config "E:\mongodb\mongod.conf" --serviceName "MongoDB"  --serviceDisplayName "MongoDB"  --install

mongod.exe --config "E:\mongodb\mongod1.conf" --serviceName "MongoDB1"  --serviceDisplayName "MongoDB1"  --install

mongod.exe --config "E:\mongodb\mongod2.conf" --serviceName "MongoDB2"  --serviceDisplayName "MongoDB2"  --install

mongod.exe --config "E:\mongodb\mongod3.conf" --serviceName "MongoDB3"  --serviceDisplayName "MongoDB3"  --install


net start MongoDB
net start MongoDB1
net start MongoDB2
net start MongoDB3

bin目录下打开cmd执行mongo.exe 

rs_conf={_id:"rs",
members:[
{_id:0,host:"192.168.2.6:27017",priority:1}, 
{_id:1,host:"192.168.2.6:27018",priority:2}, 
{_id:2,host:"192.168.2.6:27019",priority:3}, 
{_id:4,host:"192.168.2.6:27020", arbiterOnly:true}
]}

返回这个代表成功:
{
        "_id" : "rs",
        "members" : [
                {
                        "_id" : 0,
                        "host" : "192.168.2.6:27017",
                        "priority" : 1
                },
                {
                        "_id" : 1,
                        "host" : "192.168.2.6:27018",
                        "priority" : 2
                },
                {
                        "_id" : 2,
                        "host" : "192.168.2.6:27019",
                        "priority" : 3
                },
                {
                        "_id" : 4,
                        "host" : "192.168.2.6:27020",
                        "arbiterOnly" : true
                }
        ]
}

rs.initiate(rs_conf)  执行配置
{"ok":1}
rs.status() 查看状态
{
        "set" : "rs",
        "date" : ISODate("2024-08-10T02:40:20.391Z"),
        "myState" : 2,
        "term" : NumberLong(2),
        "syncSourceHost" : "192.168.2.6:27019",
        "syncSourceId" : 2,
        "heartbeatIntervalMillis" : NumberLong(2000),
        "majorityVoteCount" : 3,
        "writeMajorityCount" : 3,
        "votingMembersCount" : 4,
        "writableVotingMembersCount" : 3,
        "optimes" : {
                "lastCommittedOpTime" : {
                        "ts" : Timestamp(1723257616, 1),
                        "t" : NumberLong(2)
                },
                "lastCommittedWallTime" : ISODate("2024-08-10T02:40:16.003Z"),
                "readConcernMajorityOpTime" : {
                        "ts" : Timestamp(1723257616, 1),
                        "t" : NumberLong(2)
                },
                "appliedOpTime" : {
                        "ts" : Timestamp(1723257616, 1),
                        "t" : NumberLong(2)
                },
                "durableOpTime" : {
                        "ts" : Timestamp(1723257616, 1),
                        "t" : NumberLong(2)
                },
                "lastAppliedWallTime" : ISODate("2024-08-10T02:40:16.003Z"),
                "lastDurableWallTime" : ISODate("2024-08-10T02:40:16.003Z")
        },
        "lastStableRecoveryTimestamp" : Timestamp(1723257586, 1),
        "electionParticipantMetrics" : {
                "votedForCandidate" : true,
                "electionTerm" : NumberLong(2),
                "lastVoteDate" : ISODate("2024-08-10T02:39:15.909Z"),
                "electionCandidateMemberId" : 2,
                "voteReason" : "",
                "lastAppliedOpTimeAtElection" : {
                        "ts" : Timestamp(1723257547, 5),
                        "t" : NumberLong(1)
                },
                "maxAppliedOpTimeInSet" : {
                        "ts" : Timestamp(1723257547, 5),
                        "t" : NumberLong(1)
                },
                "priorityAtElection" : 1,
                "newTermStartDate" : ISODate("2024-08-10T02:39:15.997Z"),
                "newTermAppliedDate" : ISODate("2024-08-10T02:39:16.928Z")
        },
        "members" : [
                {
                        "_id" : 0,
                        "name" : "192.168.2.6:27017",
                        "health" : 1,
                        "state" : 2,
                        "stateStr" : "SECONDARY",
                        "uptime" : 2677,
                        "optime" : {
                                "ts" : Timestamp(1723257616, 1),
                                "t" : NumberLong(2)
                        },
                        "optimeDate" : ISODate("2024-08-10T02:40:16Z"),
                        "lastAppliedWallTime" : ISODate("2024-08-10T02:40:16.003Z"),
                        "lastDurableWallTime" : ISODate("2024-08-10T02:40:16.003Z"),
                        "syncSourceHost" : "192.168.2.6:27019",
                        "syncSourceId" : 2,
                        "infoMessage" : "",
                        "configVersion" : 1,
                        "configTerm" : 2,
                        "self" : true,
                        "lastHeartbeatMessage" : ""
                },
                {
                        "_id" : 1,
                        "name" : "192.168.2.6:27018",
                        "health" : 1,
                        "state" : 2,
                        "stateStr" : "SECONDARY",
                        "uptime" : 85,
                        "optime" : {
                                "ts" : Timestamp(1723257616, 1),
                                "t" : NumberLong(2)
                        },
                        "optimeDurable" : {
                                "ts" : Timestamp(1723257616, 1),
                                "t" : NumberLong(2)
                        },
                        "optimeDate" : ISODate("2024-08-10T02:40:16Z"),
                        "optimeDurableDate" : ISODate("2024-08-10T02:40:16Z"),
                        "lastAppliedWallTime" : ISODate("2024-08-10T02:40:16.003Z"),
                        "lastDurableWallTime" : ISODate("2024-08-10T02:40:16.003Z"),
                        "lastHeartbeat" : ISODate("2024-08-10T02:40:19.059Z"),
                        "lastHeartbeatRecv" : ISODate("2024-08-10T02:40:20.083Z"),
                        "pingMs" : NumberLong(0),
                        "lastHeartbeatMessage" : "",
                        "syncSourceHost" : "192.168.2.6:27017",
                        "syncSourceId" : 0,
                        "infoMessage" : "",
                        "configVersion" : 1,
                        "configTerm" : 2
                },
                {
                        "_id" : 2,
                        "name" : "192.168.2.6:27019",
                        "health" : 1,
                        "state" : 1,
                        "stateStr" : "PRIMARY",
                        "uptime" : 85,
                        "optime" : {
                                "ts" : Timestamp(1723257616, 1),
                                "t" : NumberLong(2)
                        },
                        "optimeDurable" : {
                                "ts" : Timestamp(1723257616, 1),
                                "t" : NumberLong(2)
                        },
                        "optimeDate" : ISODate("2024-08-10T02:40:16Z"),
                        "optimeDurableDate" : ISODate("2024-08-10T02:40:16Z"),
                        "lastAppliedWallTime" : ISODate("2024-08-10T02:40:16.003Z"),
                        "lastDurableWallTime" : ISODate("2024-08-10T02:40:16.003Z"),
                        "lastHeartbeat" : ISODate("2024-08-10T02:40:19.060Z"),
                        "lastHeartbeatRecv" : ISODate("2024-08-10T02:40:20.022Z"),
                        "pingMs" : NumberLong(0),
                        "lastHeartbeatMessage" : "",
                        "syncSourceHost" : "",
                        "syncSourceId" : -1,
                        "infoMessage" : "",
                        "electionTime" : Timestamp(1723257555, 1),
                        "electionDate" : ISODate("2024-08-10T02:39:15Z"),
                        "configVersion" : 1,
                        "configTerm" : 2
                },
                {
                        "_id" : 4,
                        "name" : "192.168.2.6:27020",
                        "health" : 1,
                        "state" : 7,
                        "stateStr" : "ARBITER",
                        "uptime" : 85,
                        "lastHeartbeat" : ISODate("2024-08-10T02:40:19.059Z"),
                        "lastHeartbeatRecv" : ISODate("2024-08-10T02:40:20.092Z"),
                        "pingMs" : NumberLong(0),
                        "lastHeartbeatMessage" : "",
                        "syncSourceHost" : "",
                        "syncSourceId" : -1,
                        "infoMessage" : "",
                        "configVersion" : 1,
                        "configTerm" : 2
                }
        ],
        "ok" : 1,
        "$clusterTime" : {
                "clusterTime" : Timestamp(1723257616, 1),
                "signature" : {
                        "hash" : BinData(0,"AAAAAAAAAAAAAAAAAAAAAAAAAAA="),
                        "keyId" : NumberLong(0)
                }
        },
        "operationTime" : Timestamp(1723257616, 1)
}

 

demo代码链接

go/mongochangestreamsdemo/demo at main · liuzhixin405/go (github.com)

mongo配置链接

config/mongo windows集群 at main · liuzhixin405/config (github.com)

From:https://www.cnblogs.com/morec/p/18352124
本文地址: http://shuzixingkong.net/article/961
0评论
提交 加载更多评论
其他文章 FFmpeg开发笔记(四十五)使用SRT Streamer开启APP直播推流
​SRT Streamer是一个安卓手机端的开源SRT协议直播推流框架,可用于RTMP直播和SRT直播。SRT Streamer支持的视频编码包括H264、H265等等,支持的音频编码包括AAC、OPUS等等,可谓功能强大的APP直播框架。 相比之下,另一款APP直播框架RTMP Streamer支
FFmpeg开发笔记(四十五)使用SRT Streamer开启APP直播推流 FFmpeg开发笔记(四十五)使用SRT Streamer开启APP直播推流
前端黑科技:使用 JavaScript 实现网页扫码功能
在数字化时代,二维码已经渗透到我们生活的方方面面。从移动支付到产品溯源,二维码凭借其便捷性和高效性,成为了信息传递的重要载体。而随着前端技术的不断发展,我们甚至可以使用 JavaScript 在网页端实现二维码扫描功能,为用户提供更加便捷的操作体验。 本文将带您深入了解如何使用 JavaScript
七夕——程序员独有的表白方式
七夕节,作为中国的传统情人节,虽然历来与诗词歌赋、浪漫约会紧密相连,但在当今的数字时代,程序员们也能以他们独有的方式,用代码和技术的力量来表达爱意。本文简要介绍了七种七夕节程序员常用的表白方式,包括编写定制程序、定制网页网站、二维码情书、游戏表白、数据分析情书、自动化提醒、代码表白;无论您是程序员还
CryptoHouse:由 ClickHouse 和 Goldsky 支持的免费区块链分析服务(ClickHouse 博客)
我们很高兴地宣布 CryptoHouse,在 crypto.clickhouse.com 上可访问,这是一个由 ClickHouse 提供支持的免费区块链分析服务。 https://crypto.clickhouse.com/ 现有的公共区块链分析服务通常需要定时、异步查询,而 ClickHouse
CryptoHouse:由 ClickHouse 和 Goldsky 支持的免费区块链分析服务(ClickHouse 博客) CryptoHouse:由 ClickHouse 和 Goldsky 支持的免费区块链分析服务(ClickHouse 博客) CryptoHouse:由 ClickHouse 和 Goldsky 支持的免费区块链分析服务(ClickHouse 博客)
使用SiliconCloud尝试GraphRag——以《三国演义》为例(手把手教程,适合小白)
本文介绍了使用SiliconCloud尝试GraphRag——以《三国演义》为例(手把手教程,适合小白)。
使用SiliconCloud尝试GraphRag——以《三国演义》为例(手把手教程,适合小白) 使用SiliconCloud尝试GraphRag——以《三国演义》为例(手把手教程,适合小白) 使用SiliconCloud尝试GraphRag——以《三国演义》为例(手把手教程,适合小白)
关键点检测(2)——关键点检测发展历程
关键点检测是计算机视觉领域中 一项重要任务,旨在识别图像或视频中具有特定意义或信息的关键点。如人脸上的鼻子,眼睛,或是关节等。在神经网络中,用于关键点检测的卷积神经网络(Convolutional Neural Networks, CNNs)有许多出名的模型。从DeepPose开始,直到现在的先进方
关键点检测(2)——关键点检测发展历程 关键点检测(2)——关键点检测发展历程 关键点检测(2)——关键点检测发展历程
为了落地DDD,我是这样“PUA”大家的
本文书接上回《先有鸡还是先有蛋?这是领域驱动设计落地最大的困局》 https://mp.weixin.qq.com/s/lzAZXgchCg_VyLmyo2N18Q 故事背景 2023年,我加入了一个全新的团队,担任技术Leader的角色,可以算做是“空降”吧,至今已经一年有余的时间了。到目前为止,
为了落地DDD,我是这样“PUA”大家的 为了落地DDD,我是这样“PUA”大家的 为了落地DDD,我是这样“PUA”大家的
win+jenkins+git+allure+tomcat+jdk部署(万人坑)
万人坑:之所以叫万人坑,是一些网站真的是非常非常非常坑,且不说按照他们的方法一次都没成功,还遇到了各种千奇百怪的问题,浪费了本人3.1415926h的时间,真想给他们竖起一个超大号的倒拇指。下面开始踩坑之路! 一.本地环境 jdk:1.8 tomcat:8.5.59 jenkins:2.264 gi
win+jenkins+git+allure+tomcat+jdk部署(万人坑) win+jenkins+git+allure+tomcat+jdk部署(万人坑) win+jenkins+git+allure+tomcat+jdk部署(万人坑)