Redis学习笔记(六):Redis之消息发布、订阅机制

消息的发布和订阅,第一想到的是Kafka、RabbitMQ、ActiveMQ等,但是实际上Redis也是有这个功能,这个功能在Redis中实现很简单,也比较粗暴。没有存储,没有各种订阅模式。只要订阅同一个渠道的订阅者就都可以收到发布到该渠道的信息。如果没有订阅者,消息也不会缓存起来,而是直接丢弃。在简单的功能、能够接受这种模式并且有补偿机制的业务中是可以考虑使用的。下面看一下这个到底是怎么玩的。

订阅者(subscribe)

客户端订阅,直接使用Jedis的API即可,没有复杂花哨的内容。看一下代码实现:

Jedis resource = jedisPool.getResource();
resource.subscribe(new JedisPubSub() {//订阅
    @Override
    public void onMessage(String channel, String message) {
        executor.execute(() -> handler(channel, message));//异步处理
    }
}, "MESSAGE_SYNC_TOPIC_1","MESSAGE_SYNC_TOPIC_2");

具体JedisPool实例的构建不多做赘述,通过JedisPool获取Jedis实例,直接使用Jedis的sbuscribe方法即可。

  • 第一个参数是订阅后,消息回调的方法
  • 第二个参数是可变数组,可以同时订阅多个channel,如上代码是订阅了两个channel

注意:这里的回调方法是阻塞的,如果内部实现逻辑复杂,使用异步处理更好,上面的代码使用的是线程池。另外如果是多台服务器,注意加上Redis分布锁的控制,防止多台机器重复消费。

发布者(publish)

发布逻辑相对于订阅逻辑就简单很多,不多做解释。看代码实现:

try (Jedis resource = jedisPool.getResource()) { //获取jedis,这种写法jdk7及以上特性,会自动释放资源
    resource.publish("MESSAGE_SYNC_TOPIC_1", "message_content");//发布
} catch (Exception e) {
    log.error(LogUtils.pattern("异步发布异常,异常信息:{}"), e.getMessage());
}

代码逻辑很简单,这种处理一般都是异步处理,需要对发布消息逻辑做try……catch处理,防止异常导致原逻辑的回滚。特别是页面数据数据较多,本身业务逻辑处理很复杂,回滚后会对本身的功能有较大的影响。

总结

Redis的这种发布订阅方式没有什么难点,只要了解其特性,然后对存在的坑做一下规避即可,具体有哪些特性,我总结了下面几种,如果不全或者存在问题,希望得到您的指正。

  • Jedis客户端回调方法是阻塞的。数据量大,逻辑处理复杂,会导致本地堆集过多的消息,异步处理避坑
  • Jedis订阅方法subscribe本身也是阻塞的,如果你是在系统启动的时候去加载订阅者,注意异步避坑,防止阻塞,系统一直停在这个位置,不能完全启动
  • 消息分发方式是广播,对于同一个channel,每个订阅者都可以收到同样的消息,如果此消息不能被重复消费,注意使用分布式锁避坑
  • 如果没有订阅者,发布者的消息会被丢弃,没有存储机制,所以在发布消息之前,确保订阅者已经完成订阅操作

上面基本都是订阅的坑,发布没有啥,主要是订阅者要先于发布者启动。对于上面说的系统启动的时候完成订阅者的加载,可以使用spring的一些方法。提供以下两种实现方式。(实现方式有很多,这里只做为两种示例)

方式一:实现ApplicationContextAware接口,重写setApplicationContext方法

@Component
public class DemoConfig implements ApplicationContextAware {
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        Demo bean = applicationContext.getBean(Demo.class);//获取订阅逻辑实现的bean
        new Thread(() -> bean.subscribeStart()).start();//异步启动
    }
}

方式二:实现InitializingBean接口,重写afterPropertiesSet方法

@Component
public class Demo implements InitializingBean {
    //具体订阅逻辑
    private void sbuscribeStart(){}
    @Override
    public void afterPropertiesSet() throws Exception {
        new Thread(this::sbuscribeStart).start();//异步启动
    }
}
  • 0
    点赞
  • 1
    收藏
    觉得还不错? 一键收藏
  • 打赏
    打赏
  • 0
    评论

“相关推荐”对你有帮助么?

  • 非常没帮助
  • 没帮助
  • 一般
  • 有帮助
  • 非常有帮助
提交
评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包

打赏作者

程序猿洞晓

你的鼓励将是我创作的最大动力

¥1 ¥2 ¥4 ¥6 ¥10 ¥20
扫码支付:¥1
获取中
扫码支付

您的余额不足,请更换扫码支付或充值

打赏作者

实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值