springboot 集成netty-socket

1. 添加依赖

<dependency>
    <groupId>com.corundumstudio.socketio</groupId>
    <artifactId>netty-socketio</artifactId>
    <version>1.7.18</version>
</dependency>

2. 添加YML配置

# SocketIO配置
socket:
  # SocketIO端口
  port: 9090
  # 连接数大小
  workCount: 100
  # 允许客户请求
  allowCustomRequests: true
  # 协议升级超时时间(毫秒),默认10秒,HTTP握手升级为ws协议超时时间
  upgradeTimeout: 10000
  # Ping消息超时时间(毫秒),默认60秒,这个时间间隔内没有接收到心跳消息就会发送超时事件
  pingTimeout: 60000
  # Ping消息间隔(毫秒),默认25秒。客户端向服务器发送一条心跳消息间隔
  pingInterval: 25000
  # 设置HTTP交互最大内容长度
  maxHttpContentLength: 1048576
  # 设置最大每帧处理数据的长度,防止他人利用大数据来攻击服务器
  maxFramePayloadLength: 1048576

3. 实现Spring配置类

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;

/**
 * Socket 配置类
 *
 * @author: Fred
 * @email 453086@qq.com
 * @create: 2021-07-20 15:22
 */
@Data
@ConfigurationProperties(prefix = "socket")
public class SocketProperties {

    private Integer port;

    private Integer workCount;

    private Boolean allowCustomRequests;

    private Integer upgradeTimeout;

    private Integer pingTimeout;

    private Integer pingInterval;

    private Integer maxFramePayloadLength;

    private Integer maxHttpContentLength;


}
import com.corundumstudio.socketio.SocketIOServer;
import com.corundumstudio.socketio.annotation.SpringAnnotationScanner;
import com.nuorui.common.config.properties.SocketProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.annotation.Resource;

/**
 * Socket 配置类
 *
 * @author: Fred
 * @email 453086@qq.com
 * @create: 2021-07-20 15:23
 */
@Configuration
@EnableConfigurationProperties(SocketProperties.class)
public class SocketConfig {

    @Resource
    private SocketProperties properties;

    @Bean
    public SocketIOServer socketIOServer() {
        com.corundumstudio.socketio.Configuration config = new com.corundumstudio.socketio.Configuration();
        config.setPort(properties.getPort());

        com.corundumstudio.socketio.SocketConfig socketConfig = new com.corundumstudio.socketio.SocketConfig();
        socketConfig.setReuseAddress(true);
        config.setSocketConfig(socketConfig);
        config.setWorkerThreads(properties.getWorkCount());
        config.setAllowCustomRequests(properties.getAllowCustomRequests());
        config.setUpgradeTimeout(properties.getUpgradeTimeout());
        config.setPingTimeout(properties.getPingTimeout());
        config.setPingInterval(properties.getPingInterval());
        config.setMaxHttpContentLength(properties.getMaxHttpContentLength());
        config.setMaxFramePayloadLength(properties.getMaxFramePayloadLength());

        return new SocketIOServer(config);
    }

    /**
     * 开启SocketIOServer注解支持
     *
     * @param socketServer
     * @return
     */
    @Bean
    public SpringAnnotationScanner springAnnotationScanner(SocketIOServer socketServer) {
        return new SpringAnnotationScanner(socketServer);
    }
}

4. 实现服务端

import cn.hutool.core.util.StrUtil;
import com.corundumstudio.socketio.SocketIOClient;
import com.google.common.collect.Maps;
import org.springframework.stereotype.Component;

import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

/**
 * 客户端缓存
 *
 * @author: Fred
 * @email 453086@qq.com
 * @create: 2021-07-20 16:01
 */
@Component
public class ClientCache {

    /**
     * 本地缓存
     */
    private static Map<String, HashMap<UUID, SocketIOClient>> concurrentHashMap = Maps.newConcurrentMap();

    /**
     * 存入本地缓存
     *
     * @param mmsi         船舶MMSI
     * @param sessionId      页面sessionID
     * @param socketIOClient 页面对应的通道连接信息
     */
    public void saveClient(String mmsi, UUID sessionId, SocketIOClient socketIOClient) {
        if (StrUtil.isNotBlank(mmsi)) {
            HashMap<UUID, SocketIOClient> sessionIdClientCache = concurrentHashMap.get(mmsi);
            if (sessionIdClientCache == null) {
                sessionIdClientCache = new HashMap<>();
            }
            sessionIdClientCache.put(sessionId, socketIOClient);
            concurrentHashMap.put(mmsi, sessionIdClientCache);
        }
    }

    /**
     * 根据用户ID获取所有通道信息
     *
     * @param mmsi
     * @return
     */
    public HashMap<UUID, SocketIOClient> getMmsiClient(String mmsi) {
        return concurrentHashMap.get(mmsi);
    }

    /**
     * 根据用户ID及页面sessionID删除页面链接信息
     *
     * @param mmsi
     * @param sessionId
     */
    public void deleteSessionClient(String mmsi, UUID sessionId) {
        concurrentHashMap.get(mmsi).remove(sessionId);
    }
}
import cn.hutool.core.util.StrUtil;
import com.corundumstudio.socketio.SocketIOClient;
import com.corundumstudio.socketio.annotation.OnConnect;
import com.corundumstudio.socketio.annotation.OnDisconnect;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.UUID;

/**
 * 类描述
 *
 * @author: Fred
 * @email 453086@qq.com
 * @create: 2021-07-20 16:33
 */
@Slf4j
@Component
public class SocketConnection {

    @Resource
    private ClientCache clientCache;

    /**
     * 客户端连接
     *
     * @param client
     */
    @OnConnect
    public void onConnect(SocketIOClient client) {
        String mmsi = client.getHandshakeData().getSingleUrlParam("mmsi");
        UUID sessionId = client.getSessionId();
        clientCache.saveClient(mmsi, sessionId, client);

        log.info("客户端:" + mmsi + "|" + sessionId + "已连接");
    }

    /**
     * 客户端断开
     *
     * @param client
     */
    @OnDisconnect
    public void onDisconnect(SocketIOClient client) {
        String mmsi = client.getHandshakeData().getSingleUrlParam("mmsi");
        if (StrUtil.isNotBlank(mmsi)) {
            UUID sessionId = client.getSessionId();
            clientCache.deleteSessionClient(mmsi, sessionId);

            log.info("客户端:" + mmsi + "|" + client.getSessionId() + "已离线");
        }
    }
}
import com.corundumstudio.socketio.SocketIOServer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;

/**
 * Socket 服务器
 *
 * @author: Fred
 * @email 453086@qq.com
 * @create: 2021-07-20 15:43
 */
@Slf4j
@Component
@Order(1)
public class SocketServer implements CommandLineRunner {

    /**
     * socketIOServer
     */
    private final SocketIOServer socketIOServer;

    @Autowired
    public SocketServer(SocketIOServer socketIOServer) {
        this.socketIOServer = socketIOServer;
    }

    @Override
    public void run(String... args) {
        socketIOServer.start();
    }
}

6. 服务端发消息给客户端

public class SocketController {

    @Resource
    private ClientCache clientCache;

    @PostMapping("/test1")
    public void test() {
        HashMap<UUID, SocketIOClient> userClient = clientCache.getMmsiClient("2222");
        userClient.forEach((uuid, socketIOClient) -> {
            //向客户端推送消息
            socketIOClient.sendEvent("event", "服务端推送消息");
        });
    }

}

————————————————————————————————————————————————————————————————————————————————————————

客户端

1. 添加依赖

<dependency>
    <groupId>io.socket</groupId>
    <artifactId>socket.io-client</artifactId>
    <version>1.0.0</version>
</dependency>

2. 客户端监听

String url = "http://127.0.0.1:9093";
        try {
            IO.Options options = new IO.Options();
            options.transports           = new String[]{"websocket"};
            options.reconnectionAttempts = 2;
            // 失败重连的时间间隔
            options.reconnectionDelay = 1000;
            // 连接超时时间(ms)
            options.timeout = 500;
            // mmsi: 唯一标识 传给服务端存储
            final Socket socket = IO.socket(url + "?mmsi=2222", options);

            socket.on(Socket.EVENT_CONNECT, args1 -> socket.send("hello..."));

            // 自定义事件`connected` -> 接收服务端成功连接消息
            socket.on("connected", objects -> log.debug("服务端:" + objects[0].toString()));

            // 自定义事件`push_data_event` -> 接收服务端消息
            socket.on("push_data_event", objects -> log.debug("服务端:" + objects[0].toString()));

            // 自定义事件`myBroadcast` -> 接收服务端广播消息
            socket.on("myBroadcast", objects -> log.debug("服务端:" + objects[0].toString()));

            socket.connect();

            socket.emit("push_data_event", "发送数据 " + System.currentTimeMillis());
        } catch (Exception e) {
            e.printStackTrace();
        }
原文地址:https://www.cnblogs.com/fangts/p/15044173.html