首页 星云 工具 资源 星选 资讯 热门工具
:

PDF转图片 完全免费 小红书视频下载 无水印 抖音视频下载 无水印 数字星空

Redis系列:使用Stream实现消息队列 (图文总结+Go案例)

编程知识
2024年08月07日 07:00

Redis24篇集合

1 先导

我们在《Redis系列14:使用List实现消息队列》这一篇中详细讨论了如何使用List实现消息队列,但同时也看到很多局限性,比如:

  • 不支持消息确认机制,没有很好的ACK应答
  • 不支持消息回溯,无法排查问题和做消息分析
  • List按照FIFO机制执行,所以存在消息堆积的风险。
  • 查询效率低,作为线性结构,List中定位一个数据需要进行遍历,O(N)的时间复杂度
  • 不存在消费组(Consumer Group)的概念,无法实现多个消费者组成分组进行消费

2 关于Stream

Redis Stream是Redis 5.0版本中引入的一种新的数据结构,它主要用于高效地处理流式数据,特别适用于消息队列、日志记录和实时数据分析等场景
以下是对Redis Stream的 主要特征:
1. 数据结构:Redis Stream是一个由有序消息组成的日志数据结构,每个消息都有一个全局唯一的ID,确保消息的顺序性和可追踪性。
2. 消息ID:消息的ID由两部分组成,分别是毫秒级时间戳和序列号。这种设计确保了消息ID的单调递增性,即新消息的ID总是大于旧消息的ID。
3. 消费者组Redis Stream支持消费者组的概念,允许多个消费者以组的形式订阅Stream,并且每个消息只会被组内的一个消费者处理,避免了消息的重复消费。

以及主要优势:

1. 持久化存储:Stream中的消息可以被持久化存储,确保数据不会丢失,即使在Redis服务器重启后也能恢复消息。
2. 有序性:消息按照产生顺序生成消息ID, 被添加到Stream中,并且可以按照指定的条件检索消息,保证了消息的有序性
3. 多播与分组消费:支持多个消费者同时消费同一流中的消息,并且可以将消费者组织成消费组,实现消息的分组消费
4. 消息确认机制:消费者可以通过XACK命令确认是否成功消费消息,保证消息至少背消费一次,确保消息不会被重复处理。
5. 阻塞读取:消费者可以选择阻塞读取模式,当没有新消息时,消费者会等待直至新消息到达。
6. 消息可回溯: 方便补数、特殊数据处理, 以及问题回溯查询

3 主要命令

1. XADD:向Stream中添加消息。如果指定的Stream不存在,则会自动创建。
2. XREAD:以阻塞/非阻塞方式获取Stream中的消息列表。
3. XREADGROUP:从消费者组中读取消息,支持阻塞读取。
4. XACK:确认消费者已经成功处理了消息。
5. XGROUP:用于管理消费者组,包括创建、设置ID、销毁消费者组等操作。
6. XPENDING:查询消费者组中的待处理消息。

3.1 XADD 消息记录

XADD命令用于向Redis Stream(流)数据结构中添加消息。

3.1.1 XADD 命令的基本语法

XADD stream_name [MAXLEN maxlen] [ID id] field1 value1 [field2 value2 ...]

1. stream_name:指定要添加消息的Stream的名字。
2. MAXLEN maxlen:可选参数,用于限制Stream的最大长度。当Stream的长度达到maxlen时,旧的消息会被自动删除。
3. ID id:可选参数,用于指定消息的ID。如果不指定该参数,Redis会自动生成一个唯一的ID。
4. field1 value1 [field2 value2 ...]:消息的字段和值,消息的内容以key-value的形式存在。

XADD命令的一个重要用途是实现消息发布功能,发布者可以使用XADD命令向Stream中添加消息。

3.1.2 XADD 示例

假设我们有一个名为userinfo_stream的Stream,并希望向其中添加一个包含sensor_idtemperature字段的消息,我们可以使用以下命令:

XADD userinfo_stream * user_name brand age 18

在这个例子中,*表示让Redis自动生成一个唯一的消息ID。消息包含两个字段:usernameage,它们的值分别是brand18。所以这边记录了一个用户信息,姓名为brand, 年龄18岁。

3.1.3 有啥需要注意的呢

  • 如果指定的Stream不存在,XADD命令会创建一个新的Stream
  • 消息的ID是唯一的,并且Redis会保证Stream中消息的ID是单调递增的。如果指定了ID,则新消息的ID必须大于Stream中现有的所有消息的ID。
  • 使用MAXLEN参数可以限制Stream的大小,这在处理大量消息时非常有用,可以避免Stream占用过多的内存或磁盘空间。

3.2 XREAD 消息消费

即将消息从队列中读取出来(消费)

3.2.1 XREAD 命令的基本语法

XREAD命令的基本语法如下:

XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]

1. COUNT count:这是一个可选参数,用于指定一次读取的最大消息数量。如果不指定,默认为1。
2. BLOCK milliseconds:这也是一个可选参数,用于指定阻塞的时间(以毫秒为单位)。如果指定了阻塞时间,并且当前没有可消费的消息,客户端将在指定的时间内阻塞等待。如果不设置该参数或设置为0,则命令将立即返回,无论是否有可消费的消息。
3. STREAMS key [key ...] ID [ID ...]:这部分指定了要消费的流(Streams)和对应的起始消息ID。可以一次指定多个流和对应的起始ID。

XREAD命令的工作机制
1. 读取指定ID之后的消息:XREAD命令会返回指定ID之后的消息(不包含指定ID的消息本身)。如果没有指定ID,或者指定的ID不存在于流中,那么命令将从流的开始或结束处读取消息,具体取决于ID的值(如“0-0”表示从流的开始处读取,“$”表示从流的当前最大ID处读取)。
2. 阻塞读取:当设置了BLOCK参数后,如果当前没有可消费的消息,客户端将进入阻塞状态,直到有新的消息到达或阻塞时间超时。这种机制非常适合实现消费者等待生产者产生新消息的场景。
3. 支持多个流:XREAD命令支持同时从多个流中读取消息,只需在命令中指定多个流和对应的起始ID即可。

3.2.2 XREAD 示例

假设我们有一个名为userinfo_stream的流,并且想要从该流中读取消息。以下是一些示例:
1. 非阻塞读取最新消息

XREAD COUNT 1 STREAMS userinfo_stream $

这条命令会尝试从userinfo_stream流中读取最新的消息(如果有的话)。$是一个特殊ID,表示流的当前最大ID。

2. 阻塞读取最新消息

XREAD COUNT 1 BLOCK 1000 STREAMS userinfo_stream $

这条命令会阻塞1000毫秒,等待userinfo_stream流中出现新的消息。如果在1000毫秒内有新消息到达,则命令会返回该消息;否则,命令将超时并返回nil。

3. 从特定ID开始读取

XREAD COUNT 2 STREAMS userinfo_stream 1722159931000-0
1) 1) "userinfo_stream"
    2)  1) 1) "1722159931000-0"
         2) 1) "user_name"
             2) "brand"
             3) "age"
             4) "18"

这条命令会从userinfo_stream流中读取ID大于或等于1722159931000-0的消息,最多返回数据。

3.2.3 需要注意啥呢?

1. 消息ID的唯一性:在Redis Streams中,每个消息都有一个全局唯一的消息ID,这个消息ID由两部分组成:时间戳和序列号。时间戳表示消息被添加到流中的时间,序列号表示在同一时间戳内添加的消息的顺序。
2. 消费者组:虽然XREAD命令本身不直接涉及消费者组的概念,但Redis Streams还支持消费者组模式,允许一组消费者协作消费同一流中的消息。在消费者组模式下,通常会使用XREADGROUP命令而不是XREAD命令来读取消息。
3. 性能考虑:XREAD命令在读取大量消息时可能会消耗较多的CPU和内存资源。因此,在实际应用中需要根据实际情况合理设置COUNT参数的值,避免一次性读取过多消息导致性能问题。

3.3 Consumer Group 消费组模式

典型的多播模式,在实时性要求比较高的场景,如果你想加快对消息的处理。那这是一个不错的选择,我们让队列在逻辑上进行分区,用不同的消费组来隔离消费。所以:

image

消费者组允许多个消费者(client 或 process)协同处理同一个流(Stream)中的消息。每个消费者组维护自己的消费偏移量(即已处理消息的位置),以支持消费者之间的负载均衡和容错。

3.3.1 创建消费者组

使用 XGROUP CREATE 命令创建消费者组。

# stream_name:队列名称
# consumer_group:消费者组
# msgIdStartIndex:消息Id开始位置
# msgIdStartIndex:消息Id结束位置
# $ 表示从流的当前末尾(即最新消息)开始创建消费者组。如果流不存在,MKSTREAM 选项将自动创建流
XGROUP CREATE stream_name consumer_group msgIdStartIndex-msgIdStartIndex
# 或者
XGROUP CREATE stream_name consumer_group $ MKSTREAM

下面是具体实现示例,为队列 userinfo_stream 创建了消费组1(consumer_group1)和 消费组2(consumer_group2):

> xgroup create userinfo_stream consumer_group1 0-0
OK
> xgroup create userinfo_stream consumer_group2 0-0
OK

3.3.2 读取消息

消费者可以通过 XREADGROUP 命令从消费者组中读取消息。XREADGROUP 命令不仅读取消息,还会更新消费者组中的消费者状态,即标记哪些消息已被读取。

# group_name: 消费者群组名
# consumer_name: 消费者名称
# COUNT number: count 消费个数
# BLOCK ms: 表示如果流中没有新消息,则命令将阻塞最多 xx 毫秒,0则无限阻塞
# stream_name: 队列名称 
# id: 消息消费ID
# []:代表可选参数
# `>`:放在命令参数的最后面,表示从尚未被消费的消息开始读取;

XREADGROUP GROUP group_name consumer_name [COUNT number] [BLOCK ms] STREAMS stream_name [stream ...] id [id ...]
# 或者
XREADGROUP GROUP group_name consumer_name COUNT 1 BLOCK 2000 STREAMS stream_name >

下面是具体实现示例,消费组 consumer_group1 的消费者 consumer1 从 userinfo_stream 中以阻塞的方式读取一条消息:

XREADGROUP GROUP consumer_group1 consumer1 COUNT 1 BLOCK 0 STREAMS userinfo_stream >
1) 1) "userinfo_stream"
   2) 1) 1) "1722159931000-0"
         2) 1) "user_name"
            2) "brand"
            3) "age"
            4) "18"

3.3.3 确认消息

处理完消息后,消费者需要发送 XACK 命令来确认消息。这告诉 Redis 这条消息已经被成功处理,并且可以从消费者组的待处理消息列表中移除

# stream_name: 队列名称 
# group_name: 消费者群组名
# <message-id> 是要确认的消息的 ID。

XACK stream_name group_name <message-id>
# ACK 确认两条消息
XACK userinfo_stream consumer_group1 1722159931000-0 1722159932000-0
(integer) 2

3.3.4 PLE:消息可靠性保障

PEL(Pending Entries List)记录了当前被消费者读取但尚未确认(ACK)的消息。这些消息在消费者成功处理并发送ACK命令之前,会一直保留在PEL中。如果消费者崩溃或未能及时发送ACK命令,Redis将确保这些消息能够被重新分配给其他消费者进行处理,从而实现消息的可靠传递。

XPENDING stream_name group_name

以下的例子中,我们查看 userinfo_stream 中的 消费组 consumer_group1 中各个消费者已读取但未确认的消息信息。

XPENDING userinfo_stream consumer_group1
1) (integer) 2   # 未确认消息条数
2) "1722159931000-0"
3) "1722159932000-0"

详细的stream操作见官网文档:https://redis.io/docs/data-types/streams-tutorial/

4 使用Golang实现Stream队列能力

4.1 先安装go-redis/redis库

> go get github.com/go-redis/redis/v8
go: downloading github.com/go-redis/redis v6.15.9+incompatible
go: downloading github.com/go-redis/redis/v8 v8.11.5
go: downloading github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f
go: downloading github.com/cespare/xxhash/v2 v2.1.2
go: added github.com/cespare/xxhash/v2 v2.1.2
go: added github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f
go: added github.com/go-redis/redis/v8 v8.11.5

注意:这里的v8是库的版本号,你可以根据实际情况进行调整

逻辑实现

package main  
  
import (  
	"context"  
	"fmt"  
	"log"  
	"time"  
  
	"github.com/go-redis/redis/v8"  
)  
  
func main() {  
	// 连接到Redis  
	rdb := redis.NewClient(&redis.Options{  
		Addr:     "localhost:6379", // Redis地址  
		Password: "",               // 密码(如果有的话)  
		DB:       0,                // 使用默认DB  
	})  
  
	ctx := context.Background()  
  
	// 创建Stream  
	_, err := rdb.XAdd(ctx, &redis.XAddArgs{  
		Stream: "mystream",  
		Values: map[string]interface{}{  
			"field1": "value1",  
			"field2": "value2",  
		},  
	}).Result()  
	if err != nil {  
		log.Fatalf("Failed to add message to stream: %v", err)  
	}  
  
	// 创建Consumer Group  
	_, err = rdb.XGroupCreate(ctx, "mystream", "mygroup", "$").Result()  
	if err != nil && err != redis.Nil {  
		log.Fatalf("Failed to create consumer group: %v", err)  
	}  
  
	// 消费者读取消息  
	go func() {  
		for {  
			msgs, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{  
				Group:    "mygroup",  
				Consumer: "myconsumer",  
				Streams:  []string{"mystream", ">"},  
				Count:    1,  
				Block:    1000, // 阻塞1000毫秒  
			}).Result()  
			if err != nil {  
				if err == redis.Nil {  
					// 超时,没有新消息  
					continue  
				}  
				log.Fatalf("Failed to read from stream: %v", err)  
			}  
  
			for _, msg := range msgs[0].Messages {  
				fmt.Printf("Received: %s %s\n", msg.ID, msg.Values)  
  
				// 确认消息  
				_, err = rdb.XAck(ctx, "mystream", "mygroup", msg.ID).Result()  
				if err != nil {  
					log.Fatalf("Failed to ack message: %v", err)  
				}  
			}  
		}  
	}()  
  
	// 模拟生产者继续发送消息  
	for i := 0; i < 5; i++ {  
		_, err := rdb.XAdd(ctx, &redis.XAddArgs{  
			Stream: "mystream",  
			Values: map[string]interface{}{  
				"field1": fmt.Sprintf("value%d", i+1),  
				"field2": "another value",  
			},  
			MaxLen:     100,  
			Approximate: true,  
		}).Result()  
		if err != nil {  
			log.Fatalf("Failed to add message to stream: %v", err)  
		}  
		time.Sleep(2 * time.Second) // 模拟生产间隔  
	}  
  
	// 注意:在实际应用中,主goroutine通常不会立即退出,而是会等待某些触发条件

5 应用场景

1. 消息队列:Redis Stream可以作为消息队列使用,支持消息的发布、订阅和消费。
2. 日志记录:将日志信息写入Redis Stream,方便后续的查询和分析。
3. 实时数据分析:结合Redis的其他数据结构(如Sorted Set、Hash等),对Stream中的数据进行实时分析。

6 总结

Redis Stream是Redis在消息队列和流式数据处理领域的一个重要补充,它提供了简单但功能强大的数据流处理能力,为开发者提供了更多的选择和灵活性。相对List,Stream的优势如下:

  • 支持消息确认机制(ACK应答确认)
  • 支持消息回溯,方便排查问题和做消息分析
  • 存在消费组(Consumer Group)的概念,可以进行分组消费和批量消费,可以负载多个消费实例
From:https://www.cnblogs.com/wzh2010/p/18030912
本文地址: http://www.shuzixingkong.net/article/864
0评论
提交 加载更多评论
其他文章 给我5分钟,保证教会你在vue3中动态加载远程组件
前言 在一些特殊的场景中(比如低代码、减少小程序包体积、类似于APP的热更新),我们需要从服务端动态加载.vue文件,然后将动态加载的远程vue组件渲染到我们的项目中。今天这篇文章我将带你学会,在vue3中如何去动态加载远程组件。 欧阳写了一本开源电子书vue3编译原理揭秘,这本书初中级前端能看懂。
给我5分钟,保证教会你在vue3中动态加载远程组件 给我5分钟,保证教会你在vue3中动态加载远程组件 给我5分钟,保证教会你在vue3中动态加载远程组件
增强用户体验:2个功能强大的.NET控制台应用帮助库
前言 对于.NET开发者而言,构建控制台应用程序时,如何提升用户交互的流畅性和满意度,是一个持续探索与优化的话题。今天大姚给大家分享2个功能强大的.NET控制台应用帮助库,希望可以帮助大家能够快速的构建漂亮、强交互性、丰富功能的控制台应用程序。 Terminal.Gui Terminal.Gui是一
增强用户体验:2个功能强大的.NET控制台应用帮助库 增强用户体验:2个功能强大的.NET控制台应用帮助库 增强用户体验:2个功能强大的.NET控制台应用帮助库
IntersectionObserver + scrollIntoView 实现电梯导航
电梯导航也被称为锚点导航,当点击锚点元素时,页面内相应标记的元素滚动到视口。而且页面内元素滚动时相应锚点也会高亮。电梯导航一般把锚点放在左右两侧,类似电梯一样。常见的电梯导航效果如下,比如一些官方文档中: 之前可能会用 getBoundingClientRect() 判断元素是否在视口中来实现类似效
IntersectionObserver + scrollIntoView 实现电梯导航 IntersectionObserver + scrollIntoView 实现电梯导航 IntersectionObserver + scrollIntoView 实现电梯导航
c#12 实验特性Interceptor如何使用的一个简单但完整的示例
一直有很多转载dotnet对Interceptor说明文档的,但鲜有说明Interceptor如何使用的,这里写一篇简单示例来展示一下 c# 12 实验特性Interceptor 是什么? 官方解释如下(其实简单说就是语言特性中内置的静态编织方式的aop功能,不同于其他il修改代码的方式,使用上得结
结合实例看 maven 传递依赖与优先级,难顶也得上丫
开心一刻 想买摩托车了,但是钱不够,想找老爸借点 我:老爸,我想买一辆摩托车,上下班也方便 老爸:你表哥上个月骑摩托车摔走了,你不知道?还要买摩托车? 我:对不起,我不买了 老板:就是啊,骑你表哥那辆得了呗,买啥新的 先抛问题 关于 maven 的依赖(dependency),我相信大家多少都知道点
结合实例看 maven 传递依赖与优先级,难顶也得上丫 结合实例看 maven 传递依赖与优先级,难顶也得上丫 结合实例看 maven 传递依赖与优先级,难顶也得上丫
Cython将Numpy数组转为自定义结构体
这篇文章介绍了在Cython中定义结构体,并在Python的Numpy数组/MemoryView和自定义结构体之间进行数据转换的方法。Cython有着非常Pythonic的编程范式,又具有接近于C语言的性能,对于Python开发者而言确实是一个很棒的工具。
《最新出炉》系列初窥篇-Python+Playwright自动化测试-65 - Canvas元素推拽-番外篇
1.简介 上一篇宏哥想了好多办法都没有演示成功的拖拽Canvas元素,宏哥也说的太绝对了,给大家造成困惑或者误导。一连好几天吃饭睡觉都不怎么香了,脑子中始终对这件事耿耿于怀,自己问自己难道就真的没有办法了吗?突然想到了一种办法抱着试一下的心态,结果出乎意料但是又在情理之中:成功推拽了!!!此刻地心情
《最新出炉》系列初窥篇-Python+Playwright自动化测试-65 - Canvas元素推拽-番外篇 《最新出炉》系列初窥篇-Python+Playwright自动化测试-65 - Canvas元素推拽-番外篇 《最新出炉》系列初窥篇-Python+Playwright自动化测试-65 - Canvas元素推拽-番外篇
后端开发学习敏捷需求-->专题的目标与价值成效
专题的目标与价值成效 什么是专题 公司或企业为了抓住业务机会或者解决痛点问题,而采取的具体的行动和举措 专题的目标分析 1.业务调研了解目标的预期 利用5W2H来进行专题分析 what——是什么?目的是什么?作什么工作? 专题是什么 专题产生的背景是什么 专题的目标是什么,要达到怎样的预期,包含那些
后端开发学习敏捷需求-->专题的目标与价值成效