Netty-socketio集成redis,服务端集群推送消息

https://blog.csdn.net/evil_lrn/article/details/105808364

开始前,首先先学习一下概念room和namespace

官方地址链接 地址

简单的来说,socket会属于某一个room,如果没有指定那么就socket就会归属默认的room,每个room又会属于某一namespace下,默认namespace是/。

客户端连接时可以指定自己的socket的归属哪个namespace ,(这里有个坑,自己定义的namespace 一定是/xiuweiSapce,注意是/xiuweiSapce不是xiuweiSapce)比如使用

var socket = io.connect("http://localhost:9099/xiuweiSpace?room=F006");

至于归属那个room就是服务端自己设置的了。

接着就是Netty-socketIo的api了,这里不多做叙述。分享一篇博客,链接 ,文章内容排版都比我写的好。

剩下的言归正传,这是使用redis并且使用Java客户端redission作为集群推送的方案原因是因为netty-soketio与redisson是同一个作者。(这里再次膜拜大神)。这里使用redis发布订阅功能,需要服务端主动推送消息的时候,不管消息发动到哪个实例都会通过发布订阅推送到其他实例上,最总获取到所有的socketClient推送消息。

接着一言不合上代码 

引入所需要的pom

  1.  
    <dependency>
  2.  
    <groupId>com.corundumstudio.socketio</groupId>
  3.  
    <artifactId>netty-socketio</artifactId>
  4.  
    <version>1.7.17</version>
  5.  
    </dependency>
  6.  
     
  7.  
    <dependency>
  8.  
    <groupId>org.redisson</groupId>
  9.  
    <artifactId>redisson</artifactId>
  10.  
    <version>3.11.0</version>
  11.  
    </dependency>

相关的配置文件与配置类

  1.  
    #WebSocket
  2.  
    # host在本地测试可以设置为localhost或者本机IP,在Linux服务器跑可换成服务器IP
  3.  
    lsmdjsj.websocket.host=socketio.host=localhost
  4.  
    lsmdjsj.websocket.socket-port=9099
  5.  
    # 设置最大每帧处理数据的长度,防止他人利用大数据来攻击服务器
  6.  
    lsmdjsj.websocket.maxFramePayloadLength=1048576
  7.  
    # 设置http交互最大内容长度
  8.  
    lsmdjsj.websocket.maxHttpContentLength=1048576
  9.  
    # 协议升级超时时间(毫秒),默认10秒。HTTP握手升级为ws协议超时时间
  10.  
    lsmdjsj.websocket.upgradeTimeout=1000000
  11.  
    # Ping消息超时时间(毫秒),默认60秒,这个时间间隔内没有接收到心跳消息就会发送超时事件
  12.  
    lsmdjsj.websocket.pingTimeout=6000000
  13.  
    # Ping消息间隔(毫秒),默认25秒。客户端向服务器发送一条心跳消息间隔
  14.  
    lsmdjsj.websocket.pingInterval=25000
  15.  
     
  16.  
     
  17.  
    #redis
  18.  
    lsmdjsj.redisson.address=redis://127.0.0.1:6379
  19.  
    lsmdjsj.redisson.password=
  20.  
    lsmdjsj.redisson.database=5
  1.  
    @Data
  2.  
    @Component
  3.  
    @ConfigurationProperties(prefix = "lsmdjsj.websocket")
  4.  
    public class WebSocketProperties {
  5.  
     
  6.  
    /**
  7.  
    * socket 地址
  8.  
    */
  9.  
    private String host;
  10.  
    /**
  11.  
    * socket 端口
  12.  
    */
  13.  
    private Integer socketPort;
  14.  
    /**
  15.  
    * 最大每帧处理数据的长度
  16.  
    */
  17.  
    private String maxFramePayloadLength;
  18.  
    /**
  19.  
    * http交互最大内容长度
  20.  
    */
  21.  
    private String maxHttpContentLength;
  22.  
     
  23.  
     
  24.  
    /**
  25.  
    * Ping 心跳间隔(毫秒)
  26.  
    */
  27.  
    private Integer pingInterval;
  28.  
    /**
  29.  
    * Ping消息超时时间(毫秒),默认60秒,这个时间间隔内没有接收到心跳消息就会发送超时事件
  30.  
    */
  31.  
    private Integer pingTimeout;
  32.  
    /**
  33.  
    * 协议升级超时时间(毫秒),默认10秒。HTTP握手升级为ws协议超时时间
  34.  
    */
  35.  
    private Integer upgradeTimeout;
  36.  
    }
  1.  
    @Data
  2.  
    @ConfigurationProperties(prefix = "lsmdjsj.redisson")
  3.  
    public class RedissonProperty {
  4.  
    private int timeout = 3000;
  5.  
     
  6.  
    private String address;
  7.  
     
  8.  
    private String password;
  9.  
     
  10.  
    private int connectionPoolSize = 5;
  11.  
     
  12.  
    private int connectionMinimumIdleSize=2;
  13.  
     
  14.  
    private int slaveConnectionPoolSize = 250;
  15.  
     
  16.  
    private int masterConnectionPoolSize = 250;
  17.  
     
  18.  
    private String[] sentinelAddresses;
  19.  
     
  20.  
    private String masterName;
  21.  
    private int database = 1;
  22.  
     
  23.  
    }

初始化netty-socketio的服务端和redisson

  1.  
    @Configuration
  2.  
    @EnableConfigurationProperties(RedissonProperty.class)
  3.  
    public class RedissonConfig {
  4.  
     
  5.  
    @Resource
  6.  
    private RedissonProperty conf;
  7.  
     
  8.  
    @Bean(name="redission",destroyMethod="shutdown")
  9.  
    public RedissonClient redission() {
  10.  
    Config config = new Config();
  11.  
    config.setCodec(new org.redisson.client.codec.StringCodec());
  12.  
    if(conf.getSentinelAddresses()!=null && conf.getSentinelAddresses().length>0){
  13.  
    config.useSentinelServers()
  14.  
    .setMasterName(conf.getMasterName()).addSentinelAddress(conf.getSentinelAddresses())
  15.  
    .setPassword(conf.getPassword()).setDatabase(conf.getDatabase());
  16.  
    }else{
  17.  
    SingleServerConfig serverConfig = config.useSingleServer()
  18.  
    .setAddress(conf.getAddress())
  19.  
    .setTimeout(conf.getTimeout())
  20.  
    .setConnectionPoolSize(conf.getConnectionPoolSize())
  21.  
    .setConnectionMinimumIdleSize(conf.getConnectionMinimumIdleSize())
  22.  
    .setDatabase(conf.getDatabase());
  23.  
    if(StringUtils.isNotBlank(conf.getPassword())) {
  24.  
    serverConfig.setPassword(conf.getPassword());
  25.  
    }
  26.  
    }
  27.  
    return Redisson.create(config);
  28.  
    }
  29.  
    }

初始化netty-socketio

  1.  
    @Configuration
  2.  
    public class SocketIoConfig {
  3.  
     
  4.  
    private static Logger logger = LoggerFactory.getLogger(SocketIoConfig.class);
  5.  
     
  6.  
    @Resource
  7.  
    private RedissonClient redisson;
  8.  
     
  9.  
    @Resource
  10.  
    private WebSocketProperties webSocketProperties;
  11.  
     
  12.  
    @Resource
  13.  
    private NettyExceptionListener nettyExceptionListener;
  14.  
     
  15.  
    /**
  16.  
    * 创建 StoreFactory
  17.  
    * @return
  18.  
    */
  19.  
    private RedissonStoreFactory createRedissonStoreFactory(){
  20.  
    logger.info("创建 RedissonStoreFactory 开始");
  21.  
    RedissonStoreFactory redissonStoreFactory = new RedissonStoreFactory(redisson);
  22.  
    logger.info("创建 RedissonStoreFactory 结束");
  23.  
    return redissonStoreFactory;
  24.  
    }
  25.  
     
  26.  
     
  27.  
     
  28.  
    @Bean
  29.  
    public SocketIOServer getSocketIOServer(){
  30.  
    logger.info("创建 SocketIOServer 开始");
  31.  
    //Sokcket配置 参考 jdk
  32.  
    SocketConfig socketConfig = new SocketConfig();
  33.  
     
  34.  
    socketConfig.setTcpNoDelay(true);
  35.  
    //在默认情况下,当调用close关闭socke的使用,close会立即返回,
  36.  
    // 但是,如果send buffer中还有数据,系统会试着先把send buffer中的数据发送出去,然后close才返回.
  37.  
    socketConfig.setSoLinger(0);
  38.  
    com.corundumstudio.socketio.Configuration config = new com.corundumstudio.socketio.Configuration();
  39.  
    // 设置监听端口
  40.  
    config.setPort(webSocketProperties.getSocketPort());
  41.  
    // 协议升级超时时间(毫秒),默认10000。HTTP握手升级为ws协议超时时间
  42.  
    config.setUpgradeTimeout(webSocketProperties.getUpgradeTimeout());
  43.  
    // Ping消息间隔(毫秒),默认25000。客户端向服务器发送一条心跳消息间隔
  44.  
    config.setPingInterval(webSocketProperties.getPingInterval());
  45.  
    // Ping消息超时时间(毫秒),默认60000,这个时间间隔内没有接收到心跳消息就会发送超时事件
  46.  
    config.setPingTimeout(webSocketProperties.getPingTimeout());
  47.  
    // 推荐使用redisson
  48.  
    config.setStoreFactory(createRedissonStoreFactory());
  49.  
    //异常处理
  50.  
    config.setExceptionListener(nettyExceptionListener);
  51.  
    //手动确认
  52.  
    config.setAckMode(AckMode.MANUAL);
  53.  
    // 握手协议参数使用JWT的Token认证方案 认证方案
  54.  
    config.setAuthorizationListener(data -> {
  55.  
    /* HttpHeaders httpHeaders = data.getHttpHeaders();
  56.  
    String token = httpHeaders.get("Authorization");*/
  57.  
    return true;
  58.  
    });
  59.  
    config.setSocketConfig(socketConfig);
  60.  
    logger.info("创建 SocketIOServer 结束");
  61.  
    return new SocketIOServer(config);
  62.  
    }
  63.  
     
  64.  
    /**
  65.  
    * spring
  66.  
    * @param socketServer
  67.  
    * @return
  68.  
    */
  69.  
    @Bean
  70.  
    public SpringAnnotationScanner springAnnotationScanner(SocketIOServer socketServer) {
  71.  
    return new SpringAnnotationScanner(socketServer);
  72.  
    }
  73.  
     
  74.  
    @Bean
  75.  
    public PubSubStore pubSubStore(SocketIOServer socketServer) {
  76.  
    return socketServer.getConfiguration().getStoreFactory().pubSubStore();
  77.  
    }
  78.  
    }

启动socket服务,并且订阅事件,也可以自定义设置相应的namespace的相关事件

  1.  
    @Component
  2.  
    @Order(1)
  3.  
    public class SocketServerRunner implements CommandLineRunner {
  4.  
     
  5.  
    private static Logger logger = LoggerFactory.getLogger(SocketServerRunner.class);
  6.  
     
  7.  
    @Resource
  8.  
    private SocketIOServer socketIOServer;
  9.  
     
  10.  
    @Resource
  11.  
    private PubSubStore pubSubStore;
  12.  
     
  13.  
    @Resource
  14.  
    private RedissonClient redisson;
  15.  
     
  16.  
    @Override
  17.  
    public void run(String... args) throws Exception {
  18.  
    logger.info("socketIOServer 启动");
  19.  
    socketIOServer.start();
  20.  
     
  21.  
    //订阅消息
  22.  
    pubSubStore.subscribe(PubSubType.DISPATCH,data -> {
  23.  
    Collection<SocketIOClient> clients = null;
  24.  
    String room = data.getRoom();
  25.  
    String namespace = data.getNamespace();
  26.  
    Packet packet = data.getPacket();
  27.  
    String jsonData = packet.getData();
  28.  
    if(!StringUtils.isBlank(namespace)){
  29.  
    SocketIONamespace socketIONamespace = socketIOServer.getNamespace(namespace);
  30.  
    if(StringUtils.isBlank(room)){
  31.  
    clients = socketIONamespace.getRoomOperations(room).getClients();
  32.  
    }
  33.  
    }else{
  34.  
    clients = socketIOServer.getBroadcastOperations().getClients();
  35.  
    }
  36.  
    if(!CollectionUtils.isEmpty(clients)){
  37.  
    for (SocketIOClient client : clients) {
  38.  
    client.sendEvent(Constants.PUSH_MSG,jsonData);
  39.  
    }
  40.  
    }
  41.  
    },DispatchMessage.class);
  42.  
    addNameSpace(socketIOServer);
  43.  
    }
  44.  
     
  45.  
     
  46.  
    private void addNameSpace(SocketIOServer socketIOServer){
  47.  
    SocketIONamespace xiuweiSpace = socketIOServer.addNamespace(Constants.XIU_WEI_NAME_SPACE);
  48.  
    xiuweiSpace.addConnectListener(client -> {
  49.  
    Map<String,Object> clientMap = new HashMap<>(16);
  50.  
    String nameSpace = client.getNamespace().getName();
  51.  
    String room = client.getHandshakeData().getSingleUrlParam("room");
  52.  
    String sessionId = client.getSessionId().toString();
  53.  
    logger.info("xiuweiSpace连接成功, room={},nameSpace={}, sessionId={}", room, nameSpace,sessionId);
  54.  
    if(StringUtils.isNotBlank(room)){
  55.  
    client.joinRoom(room);
  56.  
    clientMap.put("rooms",room);
  57.  
    }
  58.  
    clientMap.put("createTime", LocalDateTime.now().toString());
  59.  
    redisson.getBucket(Constants.KEY_ROOM_PREFIX+Constants.XIU_WEI_NAME_SPACE+sessionId).trySet(clientMap);
  60.  
    });
  61.  
    xiuweiSpace.addDisconnectListener(client -> {
  62.  
    logger.info("客户端:" + client.getSessionId() + "断开连接");
  63.  
    String sessionId = client.getSessionId().toString();
  64.  
    redisson.getBucket(Constants.KEY_ROOM_PREFIX+Constants.XIU_WEI_NAME_SPACE+sessionId).delete();
  65.  
    });
  66.  
    xiuweiSpace.addEventListener(Constants.XIU_WEI_EVINT,String.class,(client, data, ackSender) -> {
  67.  
    client.sendEvent(Constants.XIU_WEI_EVINT,data);
  68.  
    if (ackSender.isAckRequested()) {
  69.  
    logger.info("xiuwei接受到的消息. message={}", data);
  70.  
    }
  71.  
    });
  72.  
    }
  73.  
    }

默认处理未指定namesapce的socketClient 这里使用标签

  1.  
    @Component
  2.  
    public class MessageEventHandler {
  3.  
     
  4.  
    private static final Logger logger = LoggerFactory.getLogger(MessageEventHandler.class);
  5.  
     
  6.  
    public static ConcurrentMap<String, SocketIOClient> socketIOClientMap = new ConcurrentHashMap<>();
  7.  
     
  8.  
     
  9.  
    @Resource
  10.  
    private RedissonClient redisson;
  11.  
     
  12.  
    @Resource
  13.  
    private SocketIOServer socketIOServer;
  14.  
     
  15.  
    @OnConnect
  16.  
    public void onConnect(SocketIOClient client){
  17.  
    Map<String,Object> clientMap = new HashMap<>(16);
  18.  
    if(client!=null){
  19.  
    String room = client.getHandshakeData().getSingleUrlParam("room");
  20.  
    String nameSpace = client.getNamespace().getName();
  21.  
    String sessionId = client.getSessionId().toString();
  22.  
    logger.info("socket连接成功, room={}, sessionId={},namespace={}",room,sessionId,nameSpace);
  23.  
    if(StringUtils.isNotBlank(room)){
  24.  
    client.joinRoom(room);
  25.  
    clientMap.put("rooms",room);
  26.  
    }
  27.  
    clientMap.put("createTime", LocalDateTime.now().toString());
  28.  
    redisson.getBucket(Constants.KEY_ROOM_PREFIX+sessionId).trySet(clientMap);
  29.  
    }
  30.  
    }
  31.  
     
  32.  
    /**
  33.  
    * 客户端关闭连接时触发
  34.  
    *
  35.  
    * @param client
  36.  
    */
  37.  
    @OnDisconnect
  38.  
    public void onDisconnect(SocketIOClient client) {
  39.  
    logger.info("客户端:" + client.getSessionId() + "断开连接");
  40.  
    }
  41.  
     
  42.  
    /**
  43.  
    * 客户端事件
  44.  
    *
  45.  
    * @param client  客户端信息
  46.  
    * @param request 请求信息
  47.  
    * @param msg  客户端发送数据
  48.  
    */
  49.  
    @OnEvent(value = "messageevent")
  50.  
    public void onEvent(SocketIOClient client, AckRequest request, String msg) {
  51.  
    logger.info("发来消息:" + msg);
  52.  
    //回发消息
  53.  
    JSONObject jsonObject = JSON.parseObject(msg);
  54.  
    String message = jsonObject.getString("message");
  55.  
    Collection<SocketIOClient> clients = socketIOServer.getBroadcastOperations().getClients();
  56.  
    for (SocketIOClient clientByRoom : clients) {
  57.  
    clientByRoom.sendEvent("messageevent", client.getSessionId().toString()+": "+message);
  58.  
    }
  59.  
    }
  60.  
     
  61.  
    }

最后主动推送客户端的controller

  1.  
    @RequestMapping("/push")
  2.  
    @ResponseBody
  3.  
    public SinoHttpResponse<Boolean> pushMsgByService(@RequestBody ChatMessage chatMessage){
  4.  
    SocketIONamespace namespace = socketIOServer.getNamespace(chatMessage.getNamespace());
  5.  
    Collection<SocketIOClient> allClients = namespace.getAllClients();
  6.  
    for (SocketIOClient client : allClients) {
  7.  
    client.sendEvent(chatMessage.getEventName(),chatMessage.getMessage());
  8.  
    }
  9.  
    return SinoHttpResponse.success(true);
  10.  
    }

最后贴一下可以测试的html ,本人前端技术巨渣,这里使用jquery写的demo

  1.  
    <!DOCTYPE html>
  2.  
    <html lang="en" xmlns="http://www.w3.org/1999/xhtml" xmlns:th="http://www.thymeleaf.org">
  3.  
    <head>
  4.  
    <title>webSocket测试</title>
  5.  
    <script src="https://cdnjs.cloudflare.com/ajax/libs/socket.io/2.3.0/socket.io.js"></script>
  6.  
    <!-- 新 Bootstrap 核心 CSS 文件 -->
  7.  
    <link rel="stylesheet" href="//cdn.bootcss.com/bootstrap/3.3.5/css/bootstrap.min.css">
  8.  
    <!-- 可选的Bootstrap主题文件(一般不用引入) -->
  9.  
    <link rel="stylesheet" href="//cdn.bootcss.com/bootstrap/3.3.5/css/bootstrap-theme.min.css">
  10.  
    <!-- jQuery文件。务必在bootstrap.min.js 之前引入 -->
  11.  
    <script src="//cdn.bootcss.com/jquery/1.11.3/jquery.min.js"></script>
  12.  
    <!--<script type="text/javascript" src="js/jquery-1.7.2.js"></script>-->
  13.  
    <!-- 最新的 Bootstrap 核心 JavaScript 文件 -->
  14.  
    <script src="//cdn.bootcss.com/bootstrap/3.3.5/js/bootstrap.min.js"></script>
  15.  
    <script type="text/javascript">
  16.  
    $(function(){
  17.  
    /**
  18.  
    * 前端js的 socket.emit("事件名","参数数据")方法,是触发后端自定义消息事件的时候使用的,
  19.  
    * 前端js的 socket.on("事件名",匿名函数(服务器向客户端发送的数据))为监听服务器端的事件
  20.  
    **/
  21.  
    var socket = io.connect("http://localhost:9099?room=F006");
  22.  
    var firstconnect = true;
  23.  
    if(firstconnect) {
  24.  
    console.log("第一次链接初始化");
  25.  
    //监听服务器连接事件
  26.  
    socket.on('connect', function(){
  27.  
    $("#tou").html("链接服务器成功!");
  28.  
    });
  29.  
    //监听服务器关闭服务事件
  30.  
    socket.on('disconnect', function(){
  31.  
    $("#tou").html("与服务器断开了链接!");
  32.  
    });
  33.  
    //监听服务器端发送消息事件
  34.  
    socket.on('messageevent', function(data) {
  35.  
    $("#msg").html($("#msg").html() + "<br/>" + data);
  36.  
    });
  37.  
    firstconnect = false;
  38.  
    } else {
  39.  
    socket.socket.reconnect();
  40.  
    }
  41.  
     
  42.  
     
  43.  
     
  44.  
     
  45.  
     
  46.  
    $('#send').bind('click', function() {
  47.  
    send();
  48.  
    });
  49.  
     
  50.  
     
  51.  
     
  52.  
    function send(){
  53.  
    if (socket != null) {
  54.  
    debugger;
  55.  
    var message = document.getElementById('message').value;
  56.  
    var title = "message";
  57.  
    var obj = {message:message,title:title};
  58.  
    var str = JSON.stringify(obj);
  59.  
    socket.emit("messageevent",str);
  60.  
    } else {
  61.  
    alert('未与服务器链接.');
  62.  
    }
  63.  
    }
  64.  
    });
  65.  
    </script>
  66.  
     
  67.  
    </head>
  68.  
    <body>
  69.  
    <div class="page-header" id="tou">
  70.  
    webSocket及时聊天Demo程序
  71.  
    </div>
  72.  
    <div class="well" id="msg">
  73.  
    </div>
  74.  
    <div class="col-lg">
  75.  
    <div class="input-group">
  76.  
    <input type="text" class="form-control" placeholder="发送信息..." id="message">
  77.  
    <span class="input-group-btn">
  78.  
    <button class="btn btn-default" type="button" id="send" >发送</button>
  79.  
    </span>
  80.  
    </div><!-- /input-group -->
  81.  
    </div><!-- /.col-lg-6 -->
  82.  
    </div><!-- /.row --><br><br>
  83.  
    后台主动发送消息:<button class="btn btn-default" type="button" onclick="mess()">发送</button>
  84.  
    <br><br>
  85.  
    <a href="/user/index">back to index</a>
  86.  
    <script src="http://js.biocloud.cn/jquery/1.11.3/jquery.min.js"></script>
  87.  
    <script>
  88.  
    function mess() {
  89.  
    $.ajax({
  90.  
    type: "POST",
  91.  
    data:{index : "echo"},
  92.  
    url: "/websocket/auditing",
  93.  
    success: function (data) {
  94.  
    //alert(data);
  95.  
    }
  96.  
    });
  97.  
    }
  98.  
    </script>
  99.  
    </body>
  100.  
    </html>

最后感谢大家的观看,这只是一个demo,会存在各种各样的问题,欢迎大家指正

原文地址:https://www.cnblogs.com/xiami2046/p/13899962.html