Spring

Vue+Spring Boot实现WebSocket定时消息推送

 要实现本篇消息推送功能,首先要准备好:一个vue项目,一个已经集成Quartz框架的Spring Boot项目。

后端配置 

首先在pom中添加webSocket依赖:

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-websocket</artifactId>
        </dependency>

然后创建一个 MySpringConfigurator。这个类的作用是端点配置类,用在接下来我们的Web Socket配置中。

从下面代码可以看出,如果不创建这个类,Web Socket注册的Bean默认是由ServerEndpointConfig自己管理的,这个类的作用就是把Web Socket相关Bean也交给Spring去管理:

public class MySpringConfigurator extends ServerEndpointConfig.Configurator implements ApplicationContextAware {
 
    private static volatile BeanFactory context;
 
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        MySpringConfigurator.context = applicationContext;
    }
 
    @Override
    public <T> T getEndpointInstance(Class<T> clazz) throws InstantiationException {
        return context.getBean(clazz);
    }
}

创建真正的Web Socket配置类:

@Configuration
public class WebSocketConfig {

    /**
     * 注入ServerEndpointExporter,
     * 这个bean会自动注册使用了@ServerEndpoint注解声明的Websocket endpoint
     *
     * @return ServerEndpointExporter
     */
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }

    /**
     * 注册自定义的配置类
     *
     * @return MySpringConfigurator
     */
    @Bean
    public MySpringConfigurator mySpringConfigurator() {
        return new MySpringConfigurator();
    }
}

然后是Web Socket的真正实现类。因为我没找到ws传输header的解决方案,所以只能在连接的时候,用url param去鉴权,如果鉴权失败就关闭连接:

@Slf4j
@Component
//此注解相当于设置访问URL
@ServerEndpoint(value = "/websocket/alarm/{userId}/{token}", configurator = MySpringConfigurator.class)
public class AlarmWebSocket {

    /**
     * 鉴权业务逻辑类
     */
    @Autowired
    private ShiroService shiroService;

    // 连接会话,通过它来和客户端交互
    private Session session;
    // 保存websocket连接
    public static final CopyOnWriteArraySet<AlarmWebSocket> ALARM_WEB_SOCKETS = new CopyOnWriteArraySet<>();
    // 保存用户和session的对应关系
    private static final Map<String, Session> sessionPool = new HashMap<>();

    /**
     * 连接成功回调
     * 因为web socket没有请求头,所以需要在连接成功的时候做一次鉴权
     * 如果鉴权失败,断开连接
     *
     * @param session session
     * @param userId  用户id
     */
    @OnOpen
    public void onOpen(Session session, @PathParam(value = "userId") String userId, @PathParam(value = "token") String token)
            throws IOException {
        // 根据accessToken,查询用户信息
        SysUserTokenEntity tokenEntity = shiroService.queryByToken(token);
        // token失效
        if (tokenEntity == null
                || tokenEntity.getExpireTime().getTime() < System.currentTimeMillis()
                || !userId.equalsIgnoreCase(tokenEntity.getUserId())) {
            // 自定义websocket关闭原因
            CloseReason closeReason = new CloseReason(CloseReason.CloseCodes.NORMAL_CLOSURE, "鉴权失败!");
            session.close(closeReason);
            throw new IncorrectCredentialsException("token失效,请重新登录");
        }
        this.session = session;
        ALARM_WEB_SOCKETS.add(this);
        sessionPool.put(userId, session);
        log.info("【websocket消息】有新的连接,总数为:{}", ALARM_WEB_SOCKETS.size());
    }

    /**
     * 连接关闭回调
     */
    @OnClose
    public void onClose() {
        ALARM_WEB_SOCKETS.remove(this);
        log.info("【websocket消息】连接断开,总数为:{}", ALARM_WEB_SOCKETS.size());
    }

    /**
     * 收到信息的回调
     *
     * @param message 收到的信息
     */
    @OnMessage
    public void onMessage(String message) {
        log.info("【websocket消息】收到客户端消息:{}", message);
    }

    /**
     * 广播消息
     *
     * @param message 消息内容
     */
    public void sendAllMessage(String message) {
        for (AlarmWebSocket alarmWebSocket : ALARM_WEB_SOCKETS) {
            log.info("【websocket消息】广播消息:{}", message);
            try {
                Session session1 = alarmWebSocket.session;
                session1.getAsyncRemote().sendText(message);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 发送一对一消息
     *
     * @param userId  对端userId
     * @param message 消息内容
     */
    public void sendTextMessage(String userId, String message) {
        Session session = sessionPool.get(userId);
        if (session != null) {
            try {
                session.getBasicRemote().sendText(message);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

最后就是定时推送任务的编写了,因为集成了 Quartz 框架,所以后台只需要写一个定时任务Bean就好了。没有集成的朋友们可以用 @Scheduled 注解定义定时任务来代替:

@Slf4j
@Component("alarmTask")
public class AlarmTask implements ITask {
    @Autowired
    private AlarmWebSocket alarmWebSocket;
    @Override
    public void run(String params) {
        // 如果有连接则查询报警消息并推送
        if (AlarmWebSocket.ALARM_WEB_SOCKETS.size() > 0) {
            alarmWebSocket.sendAllMessage("需要推送的数据");
        }
        log.info("alarmTask定时任务正在执行");
    }
}

注:我在开发的过程中遇到了一个问题,AlarmWebSocket里的 session.getBasicRemote().sendText(message); 这句代码,如果我使用 sendText 方法就可以发送消息,用 sendObject 就不能。期待以后有时间了研究一下。

前端配置

前端配置倒也简单了,都是制式的东西,代码如下:

    // 初始化websocket
    initWebSocket: function () {
      // 创建websocket连接,传入鉴权参数
      let userId = Vue.cookie.get('userId');
      let token = Vue.cookie.get('token');
// 这里的url要换成你自己的url
this.websock = new WebSocket(window.SITE_CONFIG.wsUrl + "/websocket/alarm/" + userId + "/" + token); // 配置回调方法 this.websock.onopen = this.websocketOnOpen; this.websock.onerror = this.websocketOnError; this.websock.onmessage = this.websocketOnMessage; this.websock.onclose = this.websocketClose; }, // 连接成功回调 websocketOnOpen: function () { console.log("WebSocket连接成功"); }, // 错误回调 websocketOnError: function (e) { console.log("WebSocket连接发生错误"); console.log(e); }, // 收到消息回调 websocketOnMessage: function (e) { let obj = JSON.parse(e.data); ...业务逻辑 }, // 连接关闭回调 websocketClose: function (e) { console.log("WebSocket连接成功"); }

然后在页面生命周期函数里面调用方法:

  created() {
    // 连接websocket
    this.initWebSocket();
  },
  destroyed() {
    this.websocketClose();
  },

到这里一个定时消息推送功能就全部完成了。

原文地址:https://www.cnblogs.com/helios-fz/p/14871280.html