Springboot + Rabbitmq + WebSocet + vue

1、pom.xml

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-amqp</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-websocket</artifactId>
		</dependency>

2、WebSocketConfig

启用WebSocket的支持也是很简单,几句代码搞定

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;

@Configuration
public class WebSocketConfig {
    /**
     * 注入ServerEndpointExporter,
     * 这个bean会自动注册使用了@ServerEndpoint注解声明的Websocket endpoint
     *
     * @return
     */
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }
}

3、WebSocketServer

因为WebSocket是类似客户端服务端的形式(采用ws协议),那么这里的WebSocketServer其实就相当于一个ws协议的Controller
直接@ServerEndpoint("/websocket")@Component启用即可,然后在里面实现@OnOpen,@onClose,@onMessage等方法

package com.yanan.base.config.rabbitmq;

import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@ServerEndpoint("/websocket/{userId}") //WebSocket客户端建立连接的地址 @Component @Slf4j public class WebSocketServer { private final Logger log= LoggerFactory.getLogger(WebSocketServer.class); /** * 存活的session集合(使用线程安全的map保存) */ private static Map<String, Session> livingSessions = new ConcurrentHashMap<>(); /** * 建立连接的回调方法 * * @param session 与客户端的WebSocket连接会话 * @param userId 用户名,WebSocket支持路径参数 */ @OnOpen public void onOpen(Session session, @PathParam("userId") String userId) { livingSessions.put(session.getId(), session); log.info(userId + "进入连接"); } @OnMessage public void onMessage(String message, Session session, @PathParam("userId") String userId) { log.info(userId + " : " + message); sendMessageToAll(userId + " : " + message); } @OnError public void onError(Session session, Throwable error) { log.info("发生错误"); log.error(error.getStackTrace() + ""); } @OnClose public void onClose(Session session, @PathParam("userId") String userId) { livingSessions.remove(session.getId()); log.info(userId + " 关闭连接"); } /** * 单独发送消息 * * @param session * @param message */ public void sendMessage(Session session, String message) { try { session.getBasicRemote().sendText(message); } catch (IOException e) { e.printStackTrace(); } } /** * 群发消息 * * @param message */ public void sendMessageToAll(String message) { livingSessions.forEach((sessionId, session) -> { sendMessage(session, message); }); } }

 4、接收rabbitmq消息

      application.yml配置

spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: test
    password: test
    virtual-host: /

  监听器接收消息队列RabbitmqListener.java

package com.yanan.base.config.rabbit;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.yanan.base.config.rabbitmq.WebSocketServerEndpoint;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import sun.misc.BASE64Decoder;

import sun.misc.BASE64Decoder;
import sun.misc.BASE64Encoder;

import javax.crypto.Cipher;
import javax.crypto.spec.IvParameterSpec;
import javax.crypto.spec.SecretKeySpec;

@Component
@Slf4j
public class RabbitmqListener {
    private final Logger log= LoggerFactory.getLogger(RabbitmqListener.class);
    @Autowired
    private WebSocketServer webSocketServer;
    //  AES加密解密
    private String sKey = "785641234654";
    private String ivParamter = "0392039203";
    //queue指要监听列队的名字
    @RabbitListener(queues = "sdf")
    public void reveiverDirectQueue(String message){
        decrypt(message);
    }
    public JSONObject decrypt(String message) {
        try {
            byte[] raw = sKey.getBytes("ASCII");
            SecretKeySpec skeySpec = new SecretKeySpec(raw, "AES");
            Cipher cipher = Cipher.getInstance("AES/CBC/PKCS5Padding");
            IvParameterSpec iv = new IvParameterSpec(ivParamter.getBytes());
            cipher.init(Cipher.DECRYPT_MODE, skeySpec, iv);
            byte[] encrypted1 = new BASE64Decoder().decodeBuffer(message);// 先用 base64 解密
            byte[] original = cipher.doFinal(encrypted1);
            String originalString = new String(original, "utf-8");
            JSONObject object = JSON.parseObject(originalString);
// websocket发送消息 webSocketServer.sendMessageToAll(originalString); log.info("监听消息:" + originalString); return object; } catch (Exception ex) { return null; } } }

5、vue

data () {
    return {
      queueReceiveSetting: { // 消息队列配置
        websock: null,
        client: null,
        wsuri: 'ws://localhost:8089/websocket/dfgd'
      }
    }
}
  methods: {
    initWebSocket () {
      // ws地址
      if (this.queueReceiveSetting.websock) {
        this.queueReceiveSetting.websock.close()
      }
      this.queueReceiveSetting.websock = new WebSocket(this.queueReceiveSetting.wsuri)
      this.queueReceiveSetting.websock.onopen = res => {
        console.log('开启连接')
      }
      this.queueReceiveSetting.websock.onmessage = res => {
        let data = JSON.parse(res.data)
        console.log('接收到的数据:', data)
      }
      this.queueReceiveSetting.websock.onclose = res => {
        console.log('连接关闭')
      }
      this.queueReceiveSetting.websock.onerror = res => {
        console.log('连接出错')
      }
    }
  },
  mounted () {
this.initWebSocket()
}

  

原文地址:https://www.cnblogs.com/zqyw/p/11132467.html