【springcloud】【转载】基于redis消息订阅和websocket实现的消息推送

基于redis消息订阅和websocket技术实现的消息推送

            本文【转载】自:https://my.oschina.net/freide/blog/2991435

依赖文件

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-websocket</artifactId>
        </dependency>
    </dependencies>

创建Redis消息监听者容器

@Configuration
public class RedisConfig {

    /**
     * 创建消息监听器
     * @param factory
     * @return
     */
    @Bean
    public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory factory) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(factory);
        return container;
    }
}

创建Websocket配置类

 

import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;

/**
* 这个配置类的作用是要注入ServerEndpointExporter,这个bean会自动注册使用了@ServerEndpoint注解声明的Websocket endpoint。
* 如果是使用独立的servlet容器,而不是直接使用springboot的内置容器,就不要注入ServerEndpointExporter,因为它将由容器自己提供和管理。

*/ @Component
public class WebSocketConfig { @Bean public ServerEndpointExporter serverEndpointExporter() { return new ServerEndpointExporter(); } }

 

创建消息订阅监听者类

 

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.stereotype.Component;

import javax.websocket.Session;
import java.io.IOException;

/**
 * redis消息订阅监听者
 */
@Component
public class RedisSubscribeListener implements MessageListener {

    private Logger logger = LoggerFactory.getLogger(this.getClass());

    //webSocket客户端会话对象
    private Session session;

    /**
     * 接收发布者消息
     * @param message
     * @param bytes
     */
    @Override
    public void onMessage(Message message, byte[] bytes) {
        String msg = new String(message.getBody());
        logger.info("[{}]主题发布:{}", new String(bytes), msg);
        if (session != null && session.isOpen()) {
            try {
                session.getBasicRemote().sendText(msg);
            } catch (IOException e) {
                logger.error("[redis监听器]发布消息异常:{}", e);
            }
        }
    }

    public Session getSession() {
        return session;
    }

    public void setSession(Session session) {
        this.session = session;
    }
}

 这个消息订阅监听者类持有websocket的客户端会话对象(session),当接收到订阅的消息时,通过这个会话对象(session)将消息发送到前端,从而实现消息的主动推送。

创建Websocket服务端类

@Component

@ServerEndpoint("/websocket/server")

public class WebSocketServer {

    /**

     * 因为@ServerEndpoint不支持注入,所以使用SpringUtils获取IOC实例

     */

    private RedisMessageListenerContainer redisMessageListenerContainer = SpringUtils.getBean(RedisMessageListenerContainer.class);

    //静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。

     private static  AtomicInteger onlineCount=new AtomicInteger(0);

     //concurrent包的线程安全Set,用来存放每个客户端对应的webSocket对象。若要实现服务端与单一客户端通信的话,可以使用Map来存放,其中Key可以为用户标识

     private static CopyOnWriteArraySet<WebSocketServer> webSocketSet = new CopyOnWriteArraySet<WebSocketServer>();

     //与某个客户端的连接会话,需要通过它来给客户端发送数据

     private Session session;

     private SubscribeListener subscribeListener;

    /**

     * 连接建立成功调用的方法

     * @param session  可选的参数。session为与某个客户端的连接会话,需要通过它来给客户端发送数据

     */

    @OnOpen

    public void onOpen(Session session){

        this.session = session;

        webSocketSet.add(this);     //加入set中

        addOnlineCount();           //在线数加1

        System.out.println("有新连接加入!当前在线人数为" + getOnlineCount());

        subscribeListener = new SubscribeListener();

        subscribeListener.setSession(session);

        //设置订阅topic

        redisMessageListenerContainer.addMessageListener(subscribeListener, new ChannelTopic("TOPIC"));

    }

    /**

     * 连接关闭调用的方法

     */

    @OnClose

    public void onClose() throws IOException {

        webSocketSet.remove(this);  //从set中删除

        subOnlineCount();           //在线数减1

        redisMessageListenerContainer.removeMessageListener(subscribeListener);

        System.out.println("有一连接关闭!当前在线人数为" + getOnlineCount());

    }



    /**

     * 收到客户端消息后调用的方法

     * @param message 客户端发送过来的消息

     * @param session 可选的参数

     */

    @OnMessage

    public void onMessage(String message, Session session) {

        System.out.println("来自客户端的消息:" + message);

        //群发消息

        for(WebSocketServer item: webSocketSet){

            try {

                item.sendMessage(message);

            } catch (IOException e) {

                e.printStackTrace();

                continue;

            }

        }

    }



    /**

     * 发生错误时调用

     * @param session

     * @param error

     */

    @OnError

    public void onError(Session session, Throwable error){

        System.out.println("发生错误");

        error.printStackTrace();

    }



    /**

     * 这个方法与上面几个方法不一样。没有用注解,是根据自己需要添加的方法。

     * @param message

     * @throws IOException

     */

    public void sendMessage(String message) throws IOException {

        this.session.getBasicRemote().sendText(message);

    }



    public   int getOnlineCount() {

        return onlineCount.get();

    }



    public   void addOnlineCount() {

        WebSocketServer.onlineCount.getAndIncrement();

    }



    public   void subOnlineCount() {

        WebSocketServer.onlineCount.getAndDecrement();

    }

}
@ServerEndpoint 注解是一个类层次的注解,它的功能主要是将目前的类定义成一个websocket服务器端,注解的值将被用于监听用户连接的终端访问URL地址,
客户端可以通过这个URL来连接到WebSocket服务器端使用springboot的唯一区别是要@Component声明下,而使用独立容器是由容器自己管理websocket的,
但在springboot中连容器都是spring管理的。

 虽然@Component默认是单例模式的,但springboot还是会为每个websocket连接初始化一个bean,所以可以用一个静态set保存起来。


注意的是在客户端链接关闭的方法onClose中,一定要 删除之前的订阅监听对象,就是下面这行代码:
redisMessageListenerContainer.removeMessageListener(subscribeListener);
 
 否则在浏览器刷一下之后,后台会报如下错误:
java.lang.IllegalStateException: The WebSocket session [0] has been closed and no method (apart from close())
may be called on a closed session
 
原因就是当链接关闭之后,session对象就没有了,而订阅者对象还是会接收消息,在用session对象发送消息时会报错。
虽然代码中加了判断 if(null != session && session.isOpen()) { 可以避免报错,但是为了防止内存泄漏,应该把没有用的监听者对象从容器中删除。

创建前端页面

   在resourcestatic目录下创建html页面,命名为websocket.html。代码如下:

 <!doctype html>

<html xmlns:th="http://www.thymeleaf.org">

<head>

    <meta charset="utf-8"></meta>

    <title>websocket</title>

</head>

<h4>

使用redis订阅消息和websocket实现消息推送

</h4>

<br/>

<h5>收到的订阅消息:</h5>

<div id="message_id"></div>

</body>

<script type="text/javascript">

    var websocket = null;

    //当前浏览前是否支持websocket

    if("WebSocket" in window){

        var url = "ws://localhost:8080/demo/websocket/server";

        websocket = new WebSocket(url);

    }else{

        alert("浏览器不支持websocket");

    }



    websocket.onopen = function(event){

        setMessage("打开连接");

    }



    websocket.onclose = function(event){

        setMessage("关闭连接");

    }



    websocket.onmessage = function(event){

        setMessage(event.data);

    }



    websocket.onerror = function(event){

        setMessage("连接异常");

    }



    //监听窗口关闭事件,当窗口关闭时,主动去关闭websocket连接,防止连接还没断开就关闭窗口,server端会抛异常。

    window.onbeforeunload = function(){

        closeWebsocket();

    }



    //关闭websocket

    function closeWebsocket(){

        //3代表已经关闭

        if(3!=websocket.readyState){

            websocket.close();

        }else{

            alert("websocket之前已经关闭");

        }

    }

    //将消息显示在网页上

    function setMessage(message){

        document.getElementById('message_id').innerHTML += message + '<br/>';

    }

</script>

</html>
View Code

启动服务进行测试

  1. 启动springboot服务,浏览器输入地址:http://localhost:8080/demo/websocket.html,此时页面显示如下

 2.打开redis客户端,在命令行输入publish  TOPIC   “this is test message”

 浏览器页面显示如下:

说明刚刚发布的消息已经主动推送到浏览器显示了。

  完整代码见: https://gitee.com/freide/springboot

 

【转载】自 https://my.oschina.net/freide/blog/2991435

 

 

 

____________________________特此,勉励____________________________
本文作者cheng2839
本文链接https://www.cnblogs.com/cheng2839
关于博主:评论和私信会在第一时间回复。
版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!
声援博主:如果您觉得文章对您有帮助,可以点击文章右下角【推荐】一下。您的鼓励是博主的最大动力!
原文地址:https://www.cnblogs.com/cheng2839/p/13637235.html