基于redis订阅消息和websocket技术实现的消息推送功能

相关依赖文件

03d66f8351e1fad6019c3b5871ffd499150.jpg

创建Redis消息监听者容器

0e9699999701a24d7f1c8ebd5ac04de87a0.jpg

创建Websocket配置类

 a7e3b8e16be139f4190b8da0ed7d3972765.jpg

   这个配置类的作用是要注入ServerEndpointExporter,这个bean会自动注册使用了@ServerEndpoint注解声明的Websocket endpoint。如果是使用独立的servlet容器,而不是直接使用springboot的内置容器,就不要注入ServerEndpointExporter,因为它将由容器自己提供和管理。
本文的例子是采用的springboot的内置tomcat容器,所以还是要创建这个配置类,作用就是注入ServerEndpointExporter。
 
 

创建消息订阅监听者类

5622c290b774236b13fc32353f741aca5be.jpg

 这个消息订阅监听者类持有websocket的客户端会话对象(session),当接收到订阅的消息时,通过这个会话对象(session)将消息发送到前端,从而实现消息的主动推送。

 

创建Websocket服务端类

@Component

@ServerEndpoint("/websocket/server")

public class WebSocketServer {

    /**

     * 因为@ServerEndpoint不支持注入,所以使用SpringUtils获取IOC实例

     */

    private RedisMessageListenerContainer redisMessageListenerContainer = SpringUtils.getBean(RedisMessageListenerContainer.class);

    //静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。

     private static  AtomicInteger onlineCount=new AtomicInteger(0);

     //concurrent包的线程安全Set,用来存放每个客户端对应的webSocket对象。若要实现服务端与单一客户端通信的话,可以使用Map来存放,其中Key可以为用户标识

     private static CopyOnWriteArraySet<WebSocketServer> webSocketSet = new CopyOnWriteArraySet<WebSocketServer>();

     //与某个客户端的连接会话,需要通过它来给客户端发送数据

     private Session session;

     private SubscribeListener subscribeListener;

    /**

     * 连接建立成功调用的方法

     * @param session  可选的参数。session为与某个客户端的连接会话,需要通过它来给客户端发送数据

     */

    @OnOpen

    public void onOpen(Session session){

        this.session = session;

        webSocketSet.add(this);     //加入set中

        addOnlineCount();           //在线数加1

        System.out.println("有新连接加入!当前在线人数为" + getOnlineCount());

        subscribeListener = new SubscribeListener();

        subscribeListener.setSession(session);

        //设置订阅topic

        redisMessageListenerContainer.addMessageListener(subscribeListener, new ChannelTopic("TOPIC"));

    }

    /**

     * 连接关闭调用的方法

     */

    @OnClose

    public void onClose() throws IOException {

        webSocketSet.remove(this);  //从set中删除

        subOnlineCount();           //在线数减1

        redisMessageListenerContainer.removeMessageListener(subscribeListener);

        System.out.println("有一连接关闭!当前在线人数为" + getOnlineCount());

    }



    /**

     * 收到客户端消息后调用的方法

     * @param message 客户端发送过来的消息

     * @param session 可选的参数

     */

    @OnMessage

    public void onMessage(String message, Session session) {

        System.out.println("来自客户端的消息:" + message);

        //群发消息

        for(WebSocketServer item: webSocketSet){

            try {

                item.sendMessage(message);

            } catch (IOException e) {

                e.printStackTrace();

                continue;

            }

        }

    }



    /**

     * 发生错误时调用

     * @param session

     * @param error

     */

    @OnError

    public void onError(Session session, Throwable error){

        System.out.println("发生错误");

        error.printStackTrace();

    }



    /**

     * 这个方法与上面几个方法不一样。没有用注解,是根据自己需要添加的方法。

     * @param message

     * @throws IOException

     */

    public void sendMessage(String message) throws IOException {

        this.session.getBasicRemote().sendText(message);

    }



    public   int getOnlineCount() {

        return onlineCount.get();

    }



    public   void addOnlineCount() {

        WebSocketServer.onlineCount.getAndIncrement();

    }



    public   void subOnlineCount() {

        WebSocketServer.onlineCount.getAndDecrement();

    }

}
@ServerEndpoint 注解是一个类层次的注解,它的功能主要是将目前的类定义成一个websocket服务器端,注解的值将被用于监听用户连接的终端访问URL地址,客户端可以通过这个URL来连接到WebSocket服务器端使用springboot的唯一区别是要@Component声明下,而使用独立容器是由容器自己管理websocket的,但在springboot中连容器都是spring管理的。
 虽然@Component默认是单例模式的,但springboot还是会为每个websocket连接初始化一个bean,所以可以用一个静态set保存起来。
  注意的是在客户端链接关闭的方法onClose中,一定要 删除之前的订阅监听对象,就是下面这行代码:redisMessageListenerContainer.removeMessageListener(subscribeListener);
 否则在浏览器刷一下之后,后台会报如下错误:
java.lang.IllegalStateException: The WebSocket session [0] has been closed and no method (apart from close()) may be called on a closed session
原因就是当链接关闭之后,session对象就没有了,而订阅者对象还是会接收消息,在用session对象发送消息时会报错。
虽然代码中加了判断if (null != session && session.isOpen()) {  可以避免报错,但是为了防止内存泄漏,应该把没有用的监听者对象从容器中删除。

创建前端页面

   在resource\static目录下创建html页面,命名为websocket.html。代码如下:

  <!doctype html>

<html xmlns:th="http://www.thymeleaf.org">

<head>

    <meta charset="utf-8"></meta>

    <title>websocket</title>

</head>

<h4>

使用redis订阅消息和websocket实现消息推送

</h4>

<br/>

<h5>收到的订阅消息:</h5>

<div id="message_id"></div>

</body>

<script type="text/javascript">

    var websocket = null;

    //当前浏览前是否支持websocket

    if("WebSocket" in window){

        var url = "ws://localhost:8080/demo/websocket/server";

        websocket = new WebSocket(url);

    }else{

        alert("浏览器不支持websocket");

    }



    websocket.onopen = function(event){

        setMessage("打开连接");

    }



    websocket.onclose = function(event){

        setMessage("关闭连接");

    }



    websocket.onmessage = function(event){

        setMessage(event.data);

    }



    websocket.onerror = function(event){

        setMessage("连接异常");

    }



    //监听窗口关闭事件,当窗口关闭时,主动去关闭websocket连接,防止连接还没断开就关闭窗口,server端会抛异常。

    window.onbeforeunload = function(){

        closeWebsocket();

    }



    //关闭websocket

    function closeWebsocket(){

        //3代表已经关闭

        if(3!=websocket.readyState){

            websocket.close();

        }else{

            alert("websocket之前已经关闭");

        }

    }

    //将消息显示在网页上

    function setMessage(message){

        document.getElementById('message_id').innerHTML += message + '<br/>';

    }

</script>

</html>
 

启动服务进行测试

  1. 启动springboot服务,浏览器输入地址:http://localhost:8080/demo/websocket.html,此时页面显示如下:

8c6e6c40d94ca69c810226d4b516aa62e3f.jpg

2.打开redis客户端,在命令行输入publish  TOPIC   “this is test message”

ba9da053add84e2f2c97a918728a5f4ad34.jpg

浏览器页面显示如下:

26ab884c7b9c13a5d066198c2d69ddff366.jpg

说明刚刚发布的消息已经主动推送到浏览器显示了。

  完整代码见: https://gitee.com/freide/springboot

转载于:https://my.oschina.net/freide/blog/2991435

  • 0
    点赞
  • 5
    收藏
    觉得还不错? 一键收藏
  • 0
    评论

“相关推荐”对你有帮助么?

  • 非常没帮助
  • 没帮助
  • 一般
  • 有帮助
  • 非常有帮助
提交
评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值