websocket 点对点、广播推送

废话就不多说了。

1、springboot项目 引入websocket依赖

<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

2、springbooot在启动类中注入 ServerEndpointExporter

    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }

3、Websocket组件类

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicInteger;

@Component
@ServerEndpoint("/websocket/{sessionId}")
public class WebSocket {

    private static final Logger logger = LoggerFactory  .getLogger(WebSocket.class);

    private static final AtomicInteger onlineCount = new AtomicInteger(0);

    private Session session;

    private static CopyOnWriteArraySet<WebSocket> webSockets = new CopyOnWriteArraySet<>();

    private String sessionId="";

    @OnOpen
    public void onOpen(Session session,@PathParam("sessionId") String sessionId) {
        this.session = session;
        this.sessionId=sessionId;
        webSockets.add(this);
        int count = onlineCount.incrementAndGet(); // 在线数加1
        logger.info("有连接加入,当前连接数为:{}", count);
        sendMessage("0");

    }

    /**
     * 关闭调用方法
     */
    @OnClose
    public void onClose() {
        webSockets.remove(this);
        int count = onlineCount.decrementAndGet();
        logger.info("有连接关闭,当前连接数为:{}", count);
    }

    @OnError
    public void onError(Session session, Throwable error) {
        logger.error("发生错误:{},Session ID: {}",error.getMessage(),session.getId());
        error.printStackTrace();
    }

    @OnMessage
    public void onMessage(String message, Session session) {
        logger.info("收到来自客户"+sessionId+"的信息:{}", message);
        sendMessage(message);
    }

    /**
     * 实现服务器主动推送
     */
    public void sendMessage(String message){
        try {//推送消息和当前socket通道的会话id (客户)
            this.session.getBasicRemote().sendText(message +"=="+ sessionId);
        } catch (IOException e) {
            logger.error("websocket异常: "+ e.toString());
        }
    }


    public static void sendInfo(String message,@PathParam("sessionId") String sessionId){
        logger.info("推送消息到窗口"+sessionId+",推送内容:"+message);
        for (WebSocket item : webSockets) {
                //这里可以设定只推送给这个sid的,为null则全部推送
                if(sessionId==null) {
                    item.sendMessage(message);
                }else if(item.sessionId.equals(sessionId)){
                    item.sendMessage(message);
                }
        }

    }

    /**
     * 消息广播到前台
     *
     * @param message
     */
    public void sendAllMessage(String message) {
        for (WebSocket webSocket : webSockets) {
            try {
                webSocket.session.getBasicRemote().sendText(message);
            } catch (IOException e) {
                logger.error("websocket异常: "+ e.toString());
            }
        }
    }

}

4、测试方法  点对点推送,偶数给客户a推送  奇数给客户b推送    推荐一个在线websocket测试工具:戳这里

    @Autowired
    private WebSocket webSocket;
@RequestMapping(value
= "/test", method = RequestMethod.POST, produces = "application/json;charset=utf-8") public String test(){ String sessionId = ""; for(int i=0;i<8;i++){ JSONObject js = new JSONObject(); js.put("isHasNewMessage", i); if (i %2 ==0){ sessionId = "a"; }else{ sessionId ="b"; } webSocket.sendInfo(js.toString(), sessionId); } return "成功"; }

输入本机websocket地址 ws://ip:port/websocket/{}     说明 /websocket/{}  是websocket组件类的@ServerEndpoint("/websocket/{sessionId}") 中指明的路径后缀

如图,现在建立a客户 的socket连接通道

 再建立b客户 的socket连接通道

 然后postman调用 测试方法  http://localhost:8080/test, 分别查看客户a和客户b的消息推送结果

客户a:

 客户b:

 5、测试方法  广播推送,所有数据给客户a、客户b推送  

    @Autowired
    private WebSocket webSocket;
@RequestMapping(value = "/test", method = RequestMethod.POST, produces = "application/json;charset=utf-8") public String test(){ String sessionId = ""; for(int i=0;i<8;i++){ JSONObject js = new JSONObject(); js.put("isHasNewMessage", i); if (i %2 ==0){ sessionId = "a"; }else{ sessionId ="b"; } webSocket.sendAllMessage(js.toString()); } return "成功"; }

同样的,先建立两个客户a和客户b的连接,这里就不在重复操作截图了,直接调用测试方法,然后看客户a、客户b是否收到了消息

客户a

客户b

其他相关的截图:浏览器同时开启两个socket连接 连客户a和客户b,然后调接口 

 客户端给服务器发送消息:

1、首先建立客户a的连接,连接成功后,在下边的发送消息框中输入消息,然后点击发送

 2、可以看到如下效果 首先看控制台,分别是建立连接时候打印的有多少客户连接  然后 客户a发送给socket服务端的 歌词,最后看浏览器的页面

ok,that's all

原文地址:https://www.cnblogs.com/xuchao0506/p/14929646.html