Redis实现消息队列

在Redis中提供了三种实现消息队列的方式:

  1. List结构:基于List结构来模拟消息队列
  2. PubSub:基本的点对点消息模型
  3. Stream:较完善的消息队列模型

1. List实现消息队列

Redis的List数据结构类型是一个双向链表,而队列要求进,出口不能在同一个位置,所以可以利用List的添加取出命令来实现模拟消息队列。

  1. LPUSH,RPOP
  2. RPUSH,LPOP

但是java在消费消息的时候,如果没有消息了,消费者应该是阻塞等待,等到有消息投递了,再继续消费信息,而上述命令不是阻塞式的,如果没有消息了还在获取的话会获取到Null。所以应该实现阻塞的效果用下列命令

  1. BRPOP
  2. BLPOP

上述两个命令的取出效果是阻塞式的。

List实现消息队列的缺点:

  1. 无法避免消息丢失:例如消费者拿到消息还没有消费就宕机了
  2. 只能支持单个消费

2. 基于PubSub的消息队列

PubSub(发布订阅)是Redis2.0版本引入的消息传递模型。顾名思义,消费者可以订阅一个或多个channel,生产者向对应channel发送消息后,所有订阅者都能收到相关消息。

  1. SUBSCRIBE channel [channel] :订阅一个或多个频道
  2. PUBLISH channel msg :向一个频道发送消息
  3. PSUBSCRIBE pattern[pattern] :订阅与pattern格式匹配的所有频道

这里的PSUBSCRIBE与RabbitMQ的匹配相似。

基于PubSub的消息队列的缺点:

  1. List支持数据持久化,但是PubSub不支持数据持久化

3. 基于Stream的消息队列

Stream是Redis5.0引入的新的数据类型,可以实现一个功能较为完善的消息队列

添加命令
在这里插入图片描述

例如

XADD users * name jack age 21

users是队列,*表示消息id ,后面的部分表示消息体

消费命令

在这里插入图片描述
当ID为$时代表读取最新的消息。

例如

XREAD COUNT 1 STREAMS users 0

COUNT 1 代表每次只读取一条,STREAMS users 表示从users这个队列里读取

注意:Stream的消息队列消费消息后是不会剔除该消息的

缺点:当指定ID为$,代表读取最新的消息,如果在处理一条新消息的时候,突然来了5条消息,当再次读取最新消息时,只能读取到5条消息的最后一条,造成消息漏读的现象

Stream消息队列的优点:

  1. 消息可回溯(消费后不会被剔除)
  2. 消息可以被多个消费者读取
  3. 可以阻塞读取

3.1 消费者组

消费者组(Consumer Group):将多个消费者划分到一个组中,监听同一个队列。具备下列特点:

  1. 消息分流:队列中的消息会分流给组内的不同消费者,而不是重复消费,从而加快消息处理的速度,同一个消费者组里的消费者之间处于一种竞争的关系,消息是不会出现消费重复的,同时一定程度上也可以避免消息漏读的现象
  2. 消息标识:消费者组会维护一个标示,记录最后一个被处理的消息,哪怕消费者宕机重启,还会从标示之后读取消息。确保每一个消息都会被消费
  3. 消息确认:消费者获取消息后,消息处于pending状态,并存入一个pending-list。当处理完成后需要通过XACK来确认消息,标记消息为已处理,才会从pending-list移

如何创建消费者组?

XGROUP CREATE key groupName ID [MKSTREAM]
  1. key:队列名称
  2. groupName:消费者组名称
  3. ID:起始ID标识,$代表队列中最后一个消息,0代表队列中第一个消息
  4. MKSTREAM:队列不存在时自动创建

在这里插入图片描述

如何从消费者组读取消息?

XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key..] ID [ID..]
  1. group:消费者组名称
  2. consumer:消费者名称,如果消费者不存在,会自动创建一个消费者
  3. count:本次查询最大数量
  4. BLOCK milliseconds:是否阻塞?阻塞的时间
  5. NOACK:消费消息后不响应
  6. STREAMS key:指定队列名称
  7. ID:获取消息的起始ID >表示从下一个未消费的消息开始 。其它:根据指定id从pending-list中获取已消费但未确认的消息,例如0,是从pending-list中的第一个消息开始

那么消费者消费完消息后如何确认消息呢?

XACK key group ID [ID..]
  1. key:队列名称
  2. group:消费者组名称
  3. ID:消息的ID

java手动模拟消费者监听消息的代码

while(true){
            Object message = redis.call("XREADGROUP GROUP 你的消费组名称 消费者名称 COUNT 每次读取消息的数量 BLOCK 阻塞时间毫秒 STREAMS 队列名称 >");
            if (message == null){
                continue;
            }
            try{
                // 处理消息的逻辑 处理完毕后要ACK
                handleMessage(message);
            }catch (Exception e){
                while (true){
                    // 从等待响应的队列里拿消息
                    Object unAckMessage = redis.call("XREADGROUP GROUP 你的消费组名称 消费者名称 COUNT 每次读取消息的数量 BLOCK 阻塞时间毫秒 STREAMS 队列名称 >");
                    if (unAckMessage == null){
                        continue;
                    }
                    try {
                        handleMessage(unAckMessage);
                    }catch (Exception e1){
                        continue;
                    }
                }
            }
        }
  • 5
    点赞
  • 66
    收藏
    觉得还不错? 一键收藏
  • 2
    评论
Redis可以通过使用Stream数据结构来实现消息队列。在Redis中,消息队列可以使用XADD命令将消息发送到队列中,使用XREADGROUP命令从队列中读取消息,并使用XACK命令确认消息已被处理。 首先,可以使用XADD命令将消息发送到队列中。该命令的语法如下: XADD key [MAXLEN|MINID [=|~] threshold [LIMIT count]] field value [field value ...] 其中,key是队列的名称,field和value是消息的键值对。可以使用*作为field来自动生成消息的唯一ID。 然后,可以使用XREADGROUP命令从队列中读取消息。该命令的语法如下: XREADGROUP GROUP 消费组名称 消费者名称 COUNT 每次读取消息的数量 BLOCK 阻塞时间毫秒 STREAMS 队列名称 > 其中,消费组名称和消费者名称用于标识消费者,每次读取的消息数量和阻塞时间可以根据需求进行设置,队列名称是要读取消息的队列。 在消费者代码中,可以使用一个无限循环来监听消息,并使用XREADGROUP命令来获取消息。如果没有消息,可以继续循环等待。处理完消息后,需要调用XACK命令确认消息已被处理。 以下是一个Java代码示例,手动模拟消费者监听消息的过程: ```java while (true) { Object message = redis.call("XREADGROUP GROUP 消费组名称 消费者名称 COUNT 每次读取消息的数量 BLOCK 阻塞时间毫秒 STREAMS 队列名称 >"); if (message == null) { continue; } try { // 处理消息的逻辑 handleMessage(message); // 处理完毕后确认消息已被处理 redis.call("XACK 队列名称 消费组名称 消息ID"); } catch (Exception e) { while (true) { Object unAckMessage = redis.call("XREADGROUP GROUP 消费组名称 消费者名称 COUNT 每次读取消息的数量 BLOCK 阻塞时间毫秒 STREAMS 队列名称 >"); if (unAckMessage == null) { continue; } try { handleMessage(unAckMessage); redis.call("XACK 队列名称 消费组名称 消息ID"); } catch (Exception e1) { continue; } } } } ``` 在上述代码中,handleMessage方法用于处理消息的逻辑。如果处理消息时发生异常,可以将消息放回队列中等待重新处理。 总结起来,Redis可以通过使用Stream数据结构和相关命令来实现消息队列。通过XADD命令将消息发送到队列中,使用XREADGROUP命令从队列中读取消息,并使用XACK命令确认消息已被处理。以上是一个简单的示例代码,可以根据实际需求进行修改和扩展。

“相关推荐”对你有帮助么?

  • 非常没帮助
  • 没帮助
  • 一般
  • 有帮助
  • 非常有帮助
提交
评论 2
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值