webSocket 聊天和推送

 

1、添加依赖

<parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>1.5.9.RELEASE</version>
    </parent>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-websocket</artifactId>
        </dependency>

        <dependency>
            <groupId>org.webjars</groupId>
            <artifactId>webjars-locator</artifactId>
        </dependency>
        <dependency>
            <groupId>org.webjars</groupId>
            <artifactId>sockjs-client</artifactId>
            <version>1.0.2</version>
        </dependency>
        <dependency>
            <groupId>org.webjars</groupId>
            <artifactId>stomp-websocket</artifactId>
            <version>2.3.3</version>
        </dependency>
        <dependency>
            <groupId>org.webjars</groupId>
            <artifactId>bootstrap</artifactId>
            <version>3.3.7</version>
        </dependency>
        
       
        <dependency>
            <groupId>org.webjars</groupId>
            <artifactId>jquery</artifactId>
            <version>3.1.0</version>
        </dependency>
</dependencies>

2、webjars使用介绍

1、方便统一管理
2、主要解决前端框架版本不一致,文件混乱等问题
3、把前端资源,打包成jar包,借助maven工具进行管理
使用方法:

在前端就这样引入

<script src="/webjars/stomp-websocket/2.3.3/stomp.min.js"></script>

3、配置文件的配置

package xdclass_websocket.config;

import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.ChannelRegistration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.AbstractWebSocketMessageBrokerConfigurer;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;

import xdclass_websocket.intecepter.HttpHandShakeIntecepter;
import xdclass_websocket.intecepter.SocketChannelIntecepter;


@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer{

    
    /**
     * 注册端点,发布或者订阅消息的时候需要连接此端点
     * setAllowedOrigins 非必须,*表示允许其他域进行连接
     * withSockJS  表示开始sockejs支持
     * new HttpHandShakeIntecepter() 这个是添加一个拦截器http握手拦截器,可以通过这个类的方法获取resuest,和response
     * 可以通过这个类把sessionId放入在发送信息的时候可以在拦截器里出取出
     */
    public void registerStompEndpoints(StompEndpointRegistry registry) {

        registry.addEndpoint("/endpoint-websocket").addInterceptors(new HttpHandShakeIntecepter())
        .setAllowedOrigins("*").withSockJS();
    }

    /**
     * 配置消息代理(中介)
     * enableSimpleBroker 服务端推送给客户端的路径前缀
     * setApplicationDestinationPrefixes  客户端发送数据给服务器端的一个前缀
     */
    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        
        registry.enableSimpleBroker("/topic", "/chat");
        registry.setApplicationDestinationPrefixes("/app");
        
    }

    /**
     * 添加自己定义的拦截器 可以拦截发送请求返回请求拦截断开等
     * new SocketChannelIntecepter() 功能描述:频道拦截器 ,类似管道,可以获取消息的一些meta数据
     * @param registration
     */

    @Override
    public void configureClientInboundChannel(ChannelRegistration registration) {
        registration.interceptors( new SocketChannelIntecepter());
    }

    @Override
    public void configureClientOutboundChannel(ChannelRegistration registration) {
        registration.interceptors( new SocketChannelIntecepter());
    }

    
    
    
    
}
websocket配置

4、推送消息

package xdclass_websocket.service;

import java.util.Map;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.stereotype.Service;

import xdclass_websocket.controller.v5.StockService;
import xdclass_websocket.controller.v6.User;
import xdclass_websocket.model.InMessage;
import xdclass_websocket.model.OutMessage;


/**
 * 
 * 功能描述:简单消息模板,用来推送消息
 
 */
@Service
public class WebSocketService {

    //这个是用于把消息发送给订阅者
    //template.convertAndSend("/topic/chat",new OutMessage(msg));第一个参数是前台订阅的地址第二个参数是发布的消息
    @Autowired
    private SimpMessagingTemplate template;
    
    public void sendTopicMessage(String dest, InMessage message) throws InterruptedException{

        for(int i=0; i<20; i++){
            Thread.sleep(500L);
            template.convertAndSend(dest, new OutMessage(message.getContent()+i));
        }


    }

    public void sendChatMessage(InMessage message) {
        template.convertAndSend("/chat/single/"+message.getTo(),
                new OutMessage(message.getFrom()+" 发送:"+ message.getContent()));

    }

    /**
     *
     * 功能描述:获取系统信息,推送给客户端
     *
     * <p> 创建时间:Jan 5, 2018 </p>
     * <p> 贡献者:小D学院, 官网:www.xdclass.net </p>
     *
     * @author <a href="mailto:xd@xdclass.net">小D老师</a>
     * @since 0.0.1
     */
    public void sendServerInfo() {

        int processors = Runtime.getRuntime().availableProcessors();

        Long freeMem = Runtime.getRuntime().freeMemory();

        Long maxMem = Runtime.getRuntime().maxMemory();

        String message = String.format("服务器可用处理器:%s; 虚拟机空闲内容大小: %s; 最大内存大小: %s", processors,freeMem,maxMem );

        template.convertAndSend("/topic/server_info",new OutMessage(message));

    }

    /**
     *
     * 功能描述:v5 版本,股票信息推送
     *
     * <p> 创建时间:Jan 6, 2018 </p>
     * <p> 贡献者:小D学院, 官网:www.xdclass.net </p>
     *
     * @author <a href="mailto:xd@xdclass.net">小D老师</a>
     * @since 0.0.1
     */
    public void sendStockInfo() {

        Map<String, String> stockInfoMap = StockService.getStockInfo();
        String msgTpl = "名称: %s ; 价格: %s元 ; 最高价: %s ; 最低价: %s ; 涨跌幅: %s ; 市盈率TTM: %s ; 总市值: %s";

        if (null != stockInfoMap) {
            String msg = String.format(msgTpl, stockInfoMap.get("prod_name"), stockInfoMap.get("last_px"), stockInfoMap.get("high_px"),
                    stockInfoMap.get("low_px"), stockInfoMap.get("px_change"), stockInfoMap.get("market_value"), stockInfoMap.get("amplitude") );

            template.convertAndSend("/topic/stock_info",new OutMessage(msg));
        }
    }

    /**
     *
     * 功能描述:发送在线用户
     *
     * <p> 创建时间:Jan 6, 2018 </p>
     * <p> 贡献者:小D学院, 官网:www.xdclass.net </p>
     *
     * @author <a href="mailto:xd@xdclass.net">小D老师</a>
     * @since 0.0.1
     */
    public void sendOnlineUser(Map<String, User> onlineUser) {
        String msg = "";
        for(Map.Entry<String, User> entry : onlineUser.entrySet()){
            msg = msg.concat(entry.getValue().getUsername()+"||");
        }
        System.out.println(msg);
        template.convertAndSend("/topic/onlineuser",new OutMessage(msg));
    }

    /**
     *  
     * 功能描述: v6: 用于多人聊天
     *
     * <p> 创建时间:Jan 6, 2018 </p>
     * <p> 贡献者:小D学院, 官网:www.xdclass.net </p>
     *
     * @author <a href="mailto:xd@xdclass.net">小D老师</a>
     * @since 0.0.1
     */
    public void sendTopicChat(InMessage message) {
        String msg = message.getFrom() +" 发送:"+message.getContent();
        template.convertAndSend("/topic/chat",new OutMessage(msg));
    }
    
    
}
推送消息的接口

5、接收信息的实体类

package xdclass_websocket.model;

import java.util.Date;

public class InMessage {
    
    //从哪里来
    private String from;
    
    //到哪里去
    private String to;
    
    private String content;
    
    private Date time;

    public String getFrom() {
        return from;
    }

    
    public InMessage(){}
    
    public InMessage(String content) {
        this.content = content;
    }
    
    
    
    public void setFrom(String from) {
        this.from = from;
    }

    public String getTo() {
        return to;
    }

    public void setTo(String to) {
        this.to = to;
    }

    public String getContent() {
        return content;
    }

    public void setContent(String content) {
        this.content = content;
    }

    public Date getTime() {
        return time;
    }

    public void setTime(Date time) {
        this.time = time;
    }
    
    
    


}
接收消息的实体类
package xdclass_websocket.model;

import java.util.Date;

public class OutMessage {

    private String from;
    
    private String content;
    
    private Date time = new Date();

    public OutMessage(){}
    
    public OutMessage(String content){
        this.content = content;
        
    }
    
    
    public String getFrom() {
        return from;
    }
    
    

    public void setFrom(String from) {
        this.from = from;
    }

    public String getContent() {
        return content;
    }

    public void setContent(String content) {
        this.content = content;
    }

    public Date getTime() {
        return time;
    }

    public void setTime(Date time) {
        this.time = time;
    }
    
    
    
    
}
推送接口实体类

6、登录,接收消息并将信息发送给订阅者

package xdclass_websocket.controller.v6;

import java.util.HashMap;
import java.util.Map;

import javax.servlet.http.HttpSession;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;

import xdclass_websocket.model.InMessage;
import xdclass_websocket.service.WebSocketService;



@Controller
public class UserChatController {

    @Autowired
    private WebSocketService ws;
    
    
    //模拟数据库用户的数据
    public static Map<String, String> userMap = new HashMap<String, String>();
    static{
        userMap.put("jack", "123");
        userMap.put("mary", "456");
        userMap.put("tom", "789");
        userMap.put("tim", "000");
        userMap.put("小D", "666");
    }
    
    //在线用户存储
    public static Map<String, User> onlineUser = new HashMap<>();
    static{
        onlineUser.put("123",new User("admin","888"));
    }
    
    
    /**
     * 
     * 功能描述:用户登录

     */
    @RequestMapping(value="login", method=RequestMethod.POST)
    public String userLogin( @RequestParam(value="username", required=true)String username, 
            @RequestParam(value="pwd",required=true) String pwd, HttpSession session) {
        
        String password = userMap.get(username);
        if (pwd.equals(password)) {
            User user = new User(username, pwd);
            String sessionId = session.getId();
            onlineUser.put(sessionId, user);
            return "redirect:/v6/chat.html";
        } else {
            return "redirect:/v6/error.html";
        }
        
    }
    
    
    /**
     * 
     * 功能描述:用于定时给客户端推送在线用户
     */
    @Scheduled(fixedRate = 2000)
    public void onlineUser() {
        
        ws.sendOnlineUser(onlineUser);
    }
    
    
    
    
    
    /**
     * 
     * 功能描述 聊天接口
     * 接收前台发送过来的消息并将消息发送给订阅者
     *
     */
    @MessageMapping("/v6/chat")
    public void topicChat(InMessage message, SimpMessageHeaderAccessor headerAccessor){
        String sessionId = headerAccessor.getSessionAttributes().get("sessionId").toString();
        User user = onlineUser.get(sessionId);
        message.setFrom(user.getUsername());
        //把消息推送给订阅的 里面的方法如 template.convertAndSend("/topic/chat",new OutMessage(msg));
        ws.sendTopicChat(message);
        
    }
    
    
    
    
}
登录,接收消息发送订阅者

7、前端代码建立连接并订阅消息

js的引入

    <link href="/webjars/bootstrap/css/bootstrap.min.css" rel="stylesheet">
    <script src="/webjars/jquery/jquery.min.js"></script>
    <script src="/webjars/sockjs-client/sockjs.min.js"></script>
    <script src="/webjars/stomp-websocket/stomp.min.js"></script>
    <script src="/v6/app.js"></script>
引入js
function connect() {
    //后台配置文件中配置的基站的名称
    var socket = new SockJS('/endpoint-websocket');
    stompClient = Stomp.over(socket);
    stompClient.connect({}, function (frame) {
       // setConnected(true);
        console.log('Connected: ' + frame);
        
        //订阅群聊消息
        stompClient.subscribe('/topic/chat', function (result) {
            showContent(JSON.parse(result.body));
        });
        
        //订阅在线用户消息
        stompClient.subscribe('/topic/onlineuser', function (result) {
            showOnlieUser(JSON.parse(result.body));
        });
        
        
    });
}
连接并订阅消息

8、断开连接

function disconnect() {
    if (stompClient !== null) {
        stompClient.disconnect();
    }
    //setConnected(false);
    console.log("Disconnected");
}
断开连接

9、发送消息

//发送聊天记录
function sendContent() {
     //这个/app是个前缀是在后端配置文中配置的 stompClient.send(
"/app/v6/chat", {}, JSON.stringify({'content': $("#content").val()})); }

这样项目就可以使用了下面是各种websocket的拦截器

10、http握手拦截器,可以通过这个类的方法获取resuest,和response

package xdclass_websocket.intecepter;

import java.util.Map;

import javax.servlet.http.HttpSession;

import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.http.server.ServletServerHttpRequest;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.server.HandshakeInterceptor;

/**
 * 
 * 功能描述:http握手拦截器,可以通过这个类的方法获取resuest,和response
 
 */
public class HttpHandShakeIntecepter implements HandshakeInterceptor{

    @Override
    public boolean beforeHandshake(ServerHttpRequest request,
            ServerHttpResponse response, WebSocketHandler wsHandler,
            Map<String, Object> attributes) throws Exception {

        System.out.println("【握手拦截器】beforeHandshake");
        
        
        if(request instanceof ServletServerHttpRequest) {
            ServletServerHttpRequest servletRequest = (ServletServerHttpRequest)request;
            HttpSession session =  servletRequest.getServletRequest().getSession();
            String sessionId = session.getId();
            System.out.println("【握手拦截器】beforeHandshake sessionId="+sessionId);
            attributes.put("sessionId", sessionId);
        }
        
        return true;
    }

    
    
    @Override
    public void afterHandshake(ServerHttpRequest request,
            ServerHttpResponse response, WebSocketHandler wsHandler,
            Exception exception) {
        System.out.println("【握手拦截器】afterHandshake");
        
        if(request instanceof ServletServerHttpRequest) {
            ServletServerHttpRequest servletRequest = (ServletServerHttpRequest)request;
            HttpSession session =  servletRequest.getServletRequest().getSession();
            String sessionId = session.getId();
            System.out.println("【握手拦截器】afterHandshake sessionId="+sessionId);
        }
        
        
        
    }

}
http握手拦截器

11、发送消息,建立连接,订阅,断开时的拦截器(这个可以判断上线用户和下线用户)

package xdclass_websocket.intecepter;

import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.simp.stomp.StompHeaderAccessor;
import org.springframework.messaging.support.ChannelInterceptorAdapter;

import xdclass_websocket.controller.v6.UserChatController;

/**
 * 
 * 功能描述:频道拦截器 ,类似管道,可以获取消息的一些meta数据
 *
 *
 */
public class SocketChannelIntecepter extends ChannelInterceptorAdapter{

    /**
     * 在完成发送之后进行调用,不管是否有异常发生,一般用于资源清理
     */
    @Override
    public void afterSendCompletion(Message<?> message, MessageChannel channel,
            boolean sent, Exception ex) {
        System.out.println("SocketChannelIntecepter->afterSendCompletion");
        super.afterSendCompletion(message, channel, sent, ex);
    }

    
    /**
     * 在消息被实际发送到频道之前调用
     */
    @Override
    public Message<?> preSend(Message<?> message, MessageChannel channel) {
        System.out.println("SocketChannelIntecepter->preSend");
        
        return super.preSend(message, channel);
    }

    /**
     * 发送消息调用后立即调用
     */
    @Override
    public void postSend(Message<?> message, MessageChannel channel,
            boolean sent) {
        System.out.println("SocketChannelIntecepter->postSend");
        
        StompHeaderAccessor headerAccessor = StompHeaderAccessor.wrap(message);//消息头访问器
        
        if (headerAccessor.getCommand() == null ) return ;// 避免非stomp消息类型,例如心跳检测
        
        String sessionId = headerAccessor.getSessionAttributes().get("sessionId").toString();
        System.out.println("SocketChannelIntecepter -> sessionId = "+sessionId);
        
        switch (headerAccessor.getCommand()) {
        case CONNECT:
            connect(sessionId);
            break;
        case DISCONNECT:
            disconnect(sessionId);
            break;
        case SUBSCRIBE:
            
            break;
            
        case UNSUBSCRIBE:
            
            break;
        default:
            break;
        }
        
    }

    
    //连接成功
    private void connect(String sessionId){
        System.out.println("connect sessionId="+sessionId);
    }
    
    
    //断开连接
    private void disconnect(String sessionId){
        System.out.println("disconnect sessionId="+sessionId);
        //用户下线操作
        UserChatController.onlineUser.remove(sessionId);
    }
    
    
    
    
    
}
拦截器

12、配置文件要把这两个拦截器加入才能使用

 13、连接的监听器

package xdclass_websocket.listener;

import org.springframework.context.ApplicationListener;
import org.springframework.messaging.simp.stomp.StompHeaderAccessor;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.messaging.SessionConnectEvent;

@Component
public class ConnectEventListener implements ApplicationListener<SessionConnectEvent>{

    public void onApplicationEvent(SessionConnectEvent event) {
        StompHeaderAccessor headerAccessor =  StompHeaderAccessor.wrap(event.getMessage());
        System.out.println("【ConnectEventListener监听器事件 类型】"+headerAccessor.getCommand().getMessageType());
        
        
    }

}

14、订阅的监听

package xdclass_websocket.listener;

import org.springframework.context.ApplicationListener;
import org.springframework.messaging.simp.stomp.StompHeaderAccessor;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.messaging.SessionSubscribeEvent;


/**
 * 
 * 功能描述:springboot使用,订阅事件
 *
 * <p> 创建时间:Jan 4, 2018 </p> 
 * <p> 贡献者:小D学院, 官网:www.xdclass.net </p>
 *
 * @author <a href="mailto:xd@xdclass.net">小D老师</a>
 * @since 0.0.1
 */
@Component
public class SubscribeEventListener implements ApplicationListener<SessionSubscribeEvent>{

    /**
     * 在事件触发的时候调用这个方法
     * 
     * StompHeaderAccessor  简单消息传递协议中处理消息头的基类,
     * 通过这个类,可以获取消息类型(例如:发布订阅,建立连接断开连接),会话id等
     * 
     */
    public void onApplicationEvent(SessionSubscribeEvent event) {
        StompHeaderAccessor headerAccessor =  StompHeaderAccessor.wrap(event.getMessage());
        System.out.println("【SubscribeEventListener监听器事件 类型】"+headerAccessor.getCommand().getMessageType());
        System.out.println("【SubscribeEventListener监听器事件 sessionId】"+headerAccessor.getSessionAttributes().get("sessionId"));
        
    }

}
原文地址:https://www.cnblogs.com/dkws/p/12384678.html