Redis实现消息队列的4种方案

redis实现消息队列的集中方案:
1、基于List的LPUSH和BRPOP实现
2、PUB/SUB,订阅/发布模式
3、基于Sorted-Set的实现
4、基于Stream类型的实现
 

1、基于list列表的lpush和brpop实现消息队列

生产者,模拟30个商品id,存入list列表
<?php
$redis = new Redis();
$redis->connect('127.0.0.1',6379);
$redis->select(0);
$goodsId = 30;
$key = "sec_kill:active:active_1";
// 循环添加商品
for ($i=1;$i<=$goodsId;$i++){
    $redis->rPush($key,$i);
}

 消费者,把商品id从list列表取出来,模拟用户id,一 一对应30对,存入hash,剩下2970用户秒杀失败,存一个数量,并且把失败用户的编号存入列表list

<?php
// 假装是用户的唯一标识
$uuid = md5(uniqid('user').time());
// 创建链接redis对象
$redis = new Redis();
$redis->connect('127.0.0.1',6379);
$redis->select(0);
$goodsKey = "sec_kill:active:active_1";
$orderKey = "sec_kill:active:order_1";
$failUserNum = "sec_kill:active:fail_user_num";
$failUser = "sec_kill:active:fail_user";
if ($goodsId = $redis->lPop($goodsKey)){
    // 秒杀成功
    // 把幸运用户存在集合中
    $redis->hSet($orderKey,$goodsId,$uuid);
    echo "下单成功".PHP_EOL;
}else{
    // 秒杀失败  将失败用户计数
    $redis->incr($failUserNum);
    $redis->rPush($failUser,$uuid);
    echo "下单失败".PHP_EOL;
}

 按理说,应该有监听者watcher,监听watcher的list,是否还有商品没有消费,如果过期没消费,则应该存入生产列表,再次循环消费(博客观看者,这一段可以不用看,作者写给自己的,因为和上下文不对应,可能把你看迷糊)

2、redis发布/订阅(PUB/SUB)实现消息队列

(1)首先,介绍2组概念

channel:频道,管道,信道,它就是一个通道,所有订阅了该频道的客户端,都可以收到该频道发出的消息

pattern:模式,或者说,多个频道的集合,(有点类似正则的感觉),订阅了该模式,所有能匹配到的频道所发出的消息,该客户端都能收到

(2)channel和pattern底层实现原理

当一个客户端执行subscribe命令,订阅某个或某些频道的时候,这个客户端于被订阅频道之间就建立起了一种订阅关系。

Redis将所有频道的订阅关系都保存在服务器状态的pubsub_channels字典里面,这个字典的键是被订阅的频道,而键的值则是一个链表,链表里面记录了所有订阅这个频道的客户端,底层C语言实现原理:


struct redisServer {
 
    // ...
 
    // 保存所有频道的订阅关系
    dict *pubsub_channels;
 
    // ...
 
};

根据频道是否已有其他订阅者,关联操作分为两种情况执行:

1、如果频道已有其他订阅者,那么它在pubsub_channels字典中必然有相应的订阅者链表,程序唯一要做的就是将客户端添加到订阅者链表的末尾

2、如果频道还没有任何订阅者,那么它必然不存在与pubsub_channels字典,程序首先要在pubsub_channels字典中为频道创建一个键,并将这个键的值设置为空链表,然后再将客户端添加到链表,成为链表的第一个元素

 subscribe命令的实现可以用以下伪代码来描述:

def subscribe(*all_input_channels):
 
    # 遍历输入的所有频道
    for channel in all_input_channels:
 
        # 如果 channel 不存在于 pubsub_channels 字典(没有任何订阅者)
        # 那么在字典中添加 channel 键,并设置它的值为空链表
        if channel not in server.pubsub_channels:
            server.pubsub_channels[channel] = []
 
        # 将订阅者添加到频道所对应的链表的末尾
        server.pubsub_channels[channel].append(client)

声明:这一部分借鉴了《RabbitMQ兔老大》,本文秉着友好交流学习的目的做一下个人总结,不做任何商用,原文连接:https://blog.csdn.net/hebtu666/article/details/114827837

 unsubscribe命令和subscribe命令行为正好相反,当一个客户端退订某个或者某些频道时,服务器将从pubsub_channels中解除客户端于被退订频道之间的关联:

1、程序会根据被退订频道的名字, 在 pubsub_channels 字典中找到频道对应的订阅者链表, 然后从订阅者链表中删除退订客户端的信息

2、如果删除退订客户端之后, 频道的订阅者链表变成了空链表, 那么说明这个频道已经没有任何订阅者了, 程序将从 pubsub_channels 字典中删除频道对应的键

 unsubscribe命令的实现可以用以下伪代码来描述:


def unsubscribe(*all_input_channels):
 
    # 遍历要退订的所有频道
    for channel in all_input_channels:
 
        # 在订阅者链表中删除退订的客户端
        server.pubsub_channels[channel].remove(client)
 
        # 如果频道已经没有任何订阅者了(订阅者链表为空)
        # 那么将频道从字典中删除
        if len(server.pubsub_channels[channel]) == 0:
            server.pubsub_channels.remove(channel)

模式部分:

服务器将所有模式的订阅者关系存在了pubsub_Patterns属性里


struct redisServer {
 
    // 保存所有频道的订阅关系
    list *pubsub_patterns;
 
};

pubsub_Patterns属性是一个链表,每个结点是被订阅的模式,节点内记录了模式,节点内的client属性记录了订阅模式的客户端


typedef struct pubsubPattern{
    //订阅模式的客户端
    redisClient *client;
    //被订阅的模式
    robj *pattern;
}pubsubPattern;

每当客户端执行PSUBSCRIBE这个命令来订阅某个或某些模式时,服务器会对每个被订阅的模式执行下面的操作:

1)新建一个pubsubPattern结构,设置好两个属性

2)将新节点加到pubsub_patterns尾部

伪代码实现:


def osubscribe(*all_input_patterns):
    #遍历所有输入的模式
    #记录被订阅的模式和对应的客户端
    pubsubPattern=create()
    pubsubPattern.client=client
    pubsubPattern.pattern=pattern
 
    #插入链表末尾
    server.pub_patterns.append(pubsubPattern)

模式退订命令PUNSUBSCRIBE是PSUBSCRIBE的反操作

服务器将找到并删除那些被退订的模式


def osubscribe(*all_input_patterns):
    #遍历所有退订的模式
    for pattern in all_input_patterns:
        #遍历每一个节点
        for pubsubPattern in server.pubsub_patterns:
            #如果客户端和模式都相同
            if client==pubsubPattern.client:
                if pattern==pubsubPattern.pattern:
                    #删除
                    server.pub_patterns.remove(pubsubPattern)

发送消息:

当一个客户端执行PUBLISH<channel> <message>命令将消息发送给频道时,服务器需要:

1)把消息发送给所有本频道的订阅者

具体做法就是去pubsub_channels字典找到本频道的链表,也就是订阅名单,然后发消息

2)将消息发给,包含本频道的所有模式中的所有订阅者

具体做法就是去pubsub_patterns查找包含本频道的模式,并且把消息发送给订阅它们的客户端。


查看订阅信息:

redis2.8新增三个命令,用来查看频道和模式的相关信息。

PUBLISH CHANNELS[pattern]用于返回服务器当前被订阅的频道,pattern可写可不写,不写就查看所有,否则查看与pattern匹配的对应频道

这个子命令是通过遍历pubsub_channels字典实现的。

PUBLISH NUMSUB[CHANNEL-1 CHANNEL-2.....]返回这些频道的订阅者数量

这个子命令是通过遍历pubsub_channels字典,查看对应链表长度实现的。

PUBLISH NUMPAT返回被订阅模式数量

这个子命令是通过返回pubsub_patterns的长度实现的。

总而言之,PUBSUB 命令的三个子命令都是通过读取 pubsub_channels 字典和 pubsub_patterns 链表中的信息来实现的。

个人实操:

四个客户端 A、B、C、D,A客户端订阅了频道 news.it     D客户端订阅了频道news.et     BC客户端订阅了模式  news.[ie]t,此处中括号内,ie两个字母部分先后顺序

A命令:subscribe news.it

B命令:psubscribe news.[ie]t

C命令:psubscribe news.[ie]t

D命令:subscribe news.et

 当我们发布消息时:publish news.it  'She is not boy!'        请看效果图:

 当我们发布订阅消息时:publish news.et 'He is not girls! And  this boy is very cute!'

 注:从左到右,从上到下,一次分别为客户端  ABCD   可见模式类似正则,匹配到多少频道,当这些频道发消息时,订阅该频道的客户端,以及订阅相关模式的客户端,都会收到消息

 下面:通过PHP代码,尽可能去实现消息队列

今天没写完,请翻看本博主,下一篇redis讲解,明日写完

  • 1
    点赞
  • 8
    收藏
    觉得还不错? 一键收藏
  • 打赏
    打赏
  • 0
    评论

“相关推荐”对你有帮助么?

  • 非常没帮助
  • 没帮助
  • 一般
  • 有帮助
  • 非常有帮助
提交
评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包

打赏作者

柔情柴少

你的鼓励将是我创作的最大动力

¥1 ¥2 ¥4 ¥6 ¥10 ¥20
扫码支付:¥1
获取中
扫码支付

您的余额不足,请更换扫码支付或充值

打赏作者

实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值