Redis(六):消息队列
创始人
2025-05-30 20:43:46
0

前言

上一篇介绍了 Redis 是如何进行通信的。这节开始介绍一个常见的具体通信例子:消息队列。

消息队列(Message Queue)是一种常见的通信方式,它是一种异步通信模式,主要由发送者(也称生产者)、消息队列、接收者(也称消费者)组成,发送者负责将消息发送到消息队列中,消息队列负责对消息进行存储,接收者负责从消息队列中取出消息并进行处理。

乍一看好像很简单,但是想要实现一个稳定的消息队列,有很多细节都需要保证,比如以下三个基本要求:

  • 消息保序:消费者需要按照生产者发送消息的顺序来处理信息;
  • 处理重复消息:如果因为网络堵塞而出现消息重传,对于重复的消息只能处理一次;
  • 保证消息可靠性:如果因为故障导致消息没有处理完成,需要保证消息可靠性执行;

除此之外,还需要有很多扩展功能以应对不同的应用场景,所以小小的一个消息队列实现起来就成了一个小系统了。

市面上有很多成熟的消息队列系统,RabbitMQ、Kafka、ActiveMQ 等,Redis 也提供了三种消息队列的实现方式,来应对不同的场景。接下来我将简单介绍一下这三种实现方式:基于链表实现、发布订阅机制、Stream。

基于链表实现

参考消息队列的工作模式,最简单的实现方式就是使用链表来实现了,只需要用一个链表来存储数据,然后发送者在链表一端添加数据,而消费者在另一端取出数据即可。

为了解决重复消息的问题,可以给每一个数据添加一个全局唯一标识,来判断该消息是否已经处理过;为了保证消息的可靠性,可以开启 Redis 的持久化(后面会细说)。

可见,基于链表实现消息队列非常方便,而且可以满足于许多简单的应用场景。但是还有几个明显的问题:

  • 多个消费者不能消费同一条消息,一个消息消费完就会被删除;
  • 如果生产者消息发送很快,而消费者处理消息的速度比较慢,这就导致链表中的消息越积越多,给内存带来很大压力。

发布/订阅机制

为了解决上述提到的两个问题,Redis 实现了发布/订阅(pub/sub)机制,允许多个客户端通过订阅特定的频道来接收消息。

Redis 的 SUBSCRIBE 命令可以让客户端订阅任意数量的频道, 每当有新信息发送到被订阅的频道时, 信息就会被发送给所有订阅指定频道的客户端。

一旦客户端订阅了某个频道,它就会一直监听该频道,直到取消订阅或连接断开。在这个过程中,Redis 实例和客户端会保持一个 TCP 长连接用于传递消息。

基于频道的发布/订阅

发布者可以向指定的频道(channel)送消息;订阅者可以订阅一个或者多个频道,所有订阅此频道的订阅者都会收到此消息。

发布者发布消息的命令是 publish,用法是 publish channel message,该命令的返回值表示接收这条消息的订阅者数量。

订阅频道的命令是 subscribe,用法是 subscribe channel1 [channel2 …],客户端进入订阅状态后,只能使用 subscribeunsubscribepsubscribepunsubscribe 这四个属于"发布/订阅"的命令,否则会报错。

subscribe 订阅的频道只能通过 unsubscribe 取消订阅;unsubscribe 如果没有参数会取消订阅所有频道;

psubscribe/punsubscribe 表示订阅/取消“订阅模式”,也就是下文将会介绍的基于模式的发布/订阅;

进入订阅状态后客户端可能收到 3 种类型的回复,每种类型的回复都包含 3 个值:

  • subscribe:表示订阅成功的反馈信息。第二个值是订阅成功的频道名称,第三个是当前客户端订阅的频道数量。
  • message:表示接收到的消息,第二个值表示产生消息的频道名称,第三个值是消息的内容。
  • unsubscribe:表示成功取消订阅某个频道。第二个值是对应的频道名称,第三个值是当前客户端订阅的频道数量,当此值为0时客户端会退出订阅状态,之后就可以执行其他非"发布/订阅"模式的命令了。

具体如下所示:

127.0.0.1:6379> subscribe test
1) "subscribe"		#订阅成功
2) "test"			#订阅成功的频道
3) (integer) 1		#当前客户端订阅的频道数量#当发布者发布消息 publish test Hello,订阅者读取到的消息如下
1) "message"		#收到消息
2) "test"			#产生消息的频道
3) "Hello"			#消息内容
实现

底层是通过字典实现的,这个字典(pubsub_channels)就用于保存订阅频道的信息:字典的键为正在被订阅的频道, 而字典的值则是一个链表, 链表中保存了所有订阅这个频道的客户端。

在这里插入图片描述

频道订阅:订阅频道时先检查字段内部是否存在;不存在则为当前频道创建一个字典且创建一个链表存储客户端 ID;否则直接将客户端 ID 插入到链表中。

取消频道订阅:取消时将客户端 ID 从对应的链表中删除;如果删除之后链表已经是空链表了,则将会把这个频道从字典中删除。

发送数据:频道接收到消息后,会遍历链表,然后将消息推送给所有订阅该频道的客户端。

基于模式的发布/订阅

除了按照频道名订阅频道外,发布/订阅机制还支持基于模式的发布/订阅,即允许消费者通过指定一个模式来订阅多个频道。模式可以匹配通配符,例如 “?” 表示 1 个占位符,“*” 表示任意个占位符,“?*” 表示 1 个以上占位符。

具体使用方法与订阅频道时类似,只是命令前面加上了 p,如 psubscribe/punsubscribe 表示订阅/取消订阅频道。

使用 psubscribe 命令可以重复订阅同一个频道,如客户端执行了 psubscribe c? c?*。这时向 c1 发布消息客户端会接受到两条消息。同样的,如果有另一个客户端执行了 subscribe c1psubscribe c?* 的话,向 c1 发送一条消息该客户顿也会受到两条消息(但是是两种类型:message 和 pmessage)同时 publish 命令也返回 2。

通过订阅模式接收到的信息, 和通过订阅频道接收到的信息的格式不太一样:

  • pmessage :表示接收到的信息,第二个值表示被匹配的频道的名字,第三个值表示信息的实际内容。
实现

底层是 pubsubPattern 节点的链表。

typedef struct pubsubPattern {redisClient *client;	//订阅模式的客户端robj *pattern;			//被订阅的模式
} pubsubPattern;

在这里插入图片描述

模式订阅:新增一个 pubsubPattern 数据结构添加到链表的最后尾部,同时保存客户端 ID

取消模式订阅:从当前的链表 pubsubPatterns 结构中删除需要取消的模式订阅。

发送数据:当发布者发送消息时,会遍历所有的订阅模式进行模式匹配,再向匹配的客户端发送数据。如果订阅的模式数量很多,会耗费很多的计算资源,降低性能。

Redis 的发布/订阅机制,相较于链表实现,有以下优点:

  • 实现了多对多的消息传递:一个消息可以被多个订阅者接收,一个订阅者也可以订阅多个频道;
  • 推送消息:Redis 实现的推送消息,可以让订阅者不用轮询等待消息,避免了不必要的开销;

但是仍然存在着一些缺陷:依靠 Redis 自身的持久化不可靠、无法重复消费消息、无法实现历史消息、无法保证消息的传递等等。

Stream

之前在 Redis 的九大数据类型中就介绍过 Stream,是专门为消息队列设计的数据结构,具体的就不在这里再赘述了,总得来说就是 Stream 支持消息的持久化、自动生成全局唯一 ID、ack 确认消息的模式、消费组模式等,让消息队列更加的稳定和可靠。

但是 Stream 实现的消息队列与 Kafka、RabbitMQ 等专业的消息队列系统相比,仍然有一些缺陷:无法解决消息堆积的问题、无法保证消息的可靠性、不能支持一些如消息转发、过滤等高级功能。

但是,任何实现方式都有其优点和缺点,我们应该根据实际的业务场景来选择最合适的实现方式。

最后

本文介绍了 Redis 中的三种实现消息队列的方式。下一节将介绍 Redis 是如何实现数据可靠性的,即 Redis 的持久化。

相关内容

热门资讯

linux入门---制作进度条 了解缓冲区 我们首先来看看下面的操作: 我们首先创建了一个文件并在这个文件里面添加了...
C++ 机房预约系统(六):学... 8、 学生模块 8.1 学生子菜单、登录和注销 实现步骤: 在Student.cpp的...
JAVA多线程知识整理 Java多线程基础 线程的创建和启动 继承Thread类来创建并启动 自定义Thread类的子类&#...
【洛谷 P1090】[NOIP... [NOIP2004 提高组] 合并果子 / [USACO06NOV] Fence Repair G ...
国民技术LPUART介绍 低功耗通用异步接收器(LPUART) 简介 低功耗通用异步收发器...
城乡供水一体化平台-助力乡村振... 城乡供水一体化管理系统建设方案 城乡供水一体化管理系统是运用云计算、大数据等信息化手段࿰...
程序的循环结构和random库...   第三个参数就是步长     引入文件时记得指明字符格式,否则读入不了 ...
中国版ChatGPT在哪些方面... 目录 一、中国巨大的市场需求 二、中国企业加速创新 三、中国的人工智能发展 四、企业愿景的推进 五、...
报名开启 | 共赴一场 Flu... 2023 年 1 月 25 日,Flutter Forward 大会在肯尼亚首都内罗毕...
汇编00-MASM 和 Vis... Qt源码解析 索引 汇编逆向--- MASM 和 Visual Studio入门 前提知识ÿ...
【简陋Web应用3】实现人脸比... 文章目录🍉 前情提要🌷 效果演示🥝 实现过程1. u...
前缀和与对数器与二分法 1. 前缀和 假设有一个数组,我们想大量频繁的去访问L到R这个区间的和,...
windows安装JDK步骤 一、 下载JDK安装包 下载地址:https://www.oracle.com/jav...
分治法实现合并排序(归并排序)... 🎊【数据结构与算法】专题正在持续更新中,各种数据结构的创建原理与运用✨...
在linux上安装配置node... 目录前言1,关于nodejs2,配置环境变量3,总结 前言...
Linux学习之端口、网络协议... 端口:设备与外界通讯交流的出口 网络协议:   网络协议是指计算机通信网...
Linux内核进程管理并发同步... 并发同步并发 是指在某一时间段内能够处理多个任务的能力,而 并行 是指同一时间能够处理...
opencv学习-HOG LO... 目录1. HOG(Histogram of Oriented Gradients,方向梯度直方图)1...
EEG微状态的功能意义 导读大脑的瞬时全局功能状态反映在其电场结构上。聚类分析方法一致地提取了四种头表面脑电场结构ÿ...
【Unity 手写PBR】Bu... 写在前面 前期积累: GAMES101作业7提高-实现微表面模型你需要了解的知识 【技...