SpringBoot整合Netty实现socket通讯简单demo

依赖

 <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.42.Final</version>
        </dependency>

还用到了

   <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.20</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.75</version>
        </dependency>
ChatDto.java
import lombok.Data;
import lombok.experimental.Accessors;

/**
 * 传输实体类
 */
@Data
@Accessors(chain = true)
public class ChatDto {

    /**
     * 客户端ID 唯一
     */
    private String clientId;

    /**
     * 发送的消息
     */
    private String msg;
}

NettyChannelMap.java

import io.netty.channel.Channel;
import io.netty.channel.socket.SocketChannel;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
 * 存放连接的channel对象
 */
public class NettyChannelMap {
    private static Map<String, SocketChannel> map = new ConcurrentHashMap<String, SocketChannel>();

    public static void add(String clientId, SocketChannel socketChannel) {
        map.put(clientId, socketChannel);
    }

    public static Channel get(String clientId) {
        return map.get(clientId);
    }

    public static void remove(SocketChannel socketChannel) {
        for (Map.Entry entry : map.entrySet()) {
            if (entry.getValue() == socketChannel) {
                map.remove(entry.getKey());
            }
        }
    }
}

NettyTcpServerBootstrap.java

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;
import io.netty.handler.timeout.IdleStateHandler;

import java.util.concurrent.TimeUnit;

/**
 * 服务端启动类
 */
public class NettyTcpServerBootstrap {
    private int port;
    private SocketChannel socketChannel;
    public NettyTcpServerBootstrap(int port) throws InterruptedException {
        this.port = port;
    }

    public void start() throws InterruptedException {
        EventLoopGroup boss=new NioEventLoopGroup();
        EventLoopGroup worker=new NioEventLoopGroup();
        ServerBootstrap bootstrap=new ServerBootstrap();
        bootstrap.group(boss,worker);
        bootstrap.channel(NioServerSocketChannel.class);
        bootstrap.option(ChannelOption.SO_BACKLOG, 128);
        /**
         * 通过NoDelay禁用Nagle,使消息立即发出去,不用等待到一定的数据量才发出去
         */
        bootstrap.childOption(ChannelOption.TCP_NODELAY, true);
        /**
         * 保持长连接状态
         */
        bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
        bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel socketChannel) throws Exception {
                ChannelPipeline p = socketChannel.pipeline();
                p.addLast(new ObjectEncoder());
                p.addLast(new ObjectDecoder(ClassResolvers.cacheDisabled(null)));
                p.addLast(new IdleStateHandler(10,0,0, TimeUnit.SECONDS));
                p.addLast(new NettyServerHandler());
            }
        });
        ChannelFuture f= bootstrap.bind(port).sync();
        if(f.isSuccess()){
            System.out.println("socket server start---------------");
        }
    }

}

NettyServerHandler.java

import com.alibaba.fastjson.JSON;
import com.example.netty.dto.ChatDto;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.socket.SocketChannel;
import io.netty.util.ReferenceCountUtil;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class NettyServerHandler extends SimpleChannelInboundHandler<Object> {
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        /**
         * channel失效,从Map中移除
         */
        NettyChannelMap.remove((SocketChannel) ctx.channel());
    }


    /**
     * 服务端 接收到 客户端 发的数据
     * @param context
     * @param obj
     * @throws Exception
     */
    @Override
    protected void channelRead0(ChannelHandlerContext context, Object obj) throws Exception {

        log.info(">>>>>>>>>>>服务端接收到客户端的消息:{}",obj);

        SocketChannel socketChannel = (SocketChannel) context.channel();
        ChatDto dto = JSON.parseObject(obj.toString(), ChatDto.class);
        /**
         * 客户端ID
         */
        String clientId = dto.getClientId();

        if (clientId == null) {
            /**
             * 心跳包处理
             */
            ChatDto pingDto=new ChatDto();
            pingDto.setMsg("服务端收到心跳包,返回响应");
            socketChannel.writeAndFlush(JSON.toJSONString(pingDto));
            return;
        }

        Channel channel = NettyChannelMap.get(clientId);

        if (channel==null){
            /**
             * 存放所有连接客户端
             */
            NettyChannelMap.add(clientId, socketChannel);
            channel=socketChannel;
        }


        /**
         * 服务器返回客户端消息
         */
        ChatDto returnDto=new ChatDto();
        returnDto.setClientId(clientId).setMsg("我是服务端,收到你的消息了");

        channel.writeAndFlush(JSON.toJSONString(returnDto));


         ReferenceCountUtil.release(obj);
    }
}

客户端

NettyClientBootstrap.java

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import io.netty.util.concurrent.EventExecutorGroup;


/**
 * 客户端启动类
 */
public class NettyClientBootstrap {
    private int port;
    private String host;
    public SocketChannel socketChannel;
    private static final EventExecutorGroup group = new DefaultEventExecutorGroup(20);

    public NettyClientBootstrap(int port, String host) throws InterruptedException {
        this.port = port;
        this.host = host;
        start();
    }

    private void start() throws InterruptedException {
        EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.channel(NioSocketChannel.class);
        bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
        bootstrap.group(eventLoopGroup);
        bootstrap.remoteAddress(host, port);
        bootstrap.handler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel socketChannel) throws Exception {
                socketChannel.pipeline().addLast(new IdleStateHandler(20, 10, 0));
                socketChannel.pipeline().addLast(new ObjectEncoder());
                socketChannel.pipeline().addLast(new ObjectDecoder(ClassResolvers.cacheDisabled(null)));
                socketChannel.pipeline().addLast(new NettyClientHandler());
            }
        });
        ChannelFuture future = bootstrap.connect(host, port).sync();
        if (future.isSuccess()) {
            socketChannel = (SocketChannel) future.channel();
            System.out.println("connect server  成功---------");
        }
    }

}

NettyClientHandler.java

import com.alibaba.fastjson.JSON;
import com.example.netty.dto.ChatDto;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.ReferenceCountUtil;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class NettyClientHandler extends SimpleChannelInboundHandler<Object> {

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent e = (IdleStateEvent) evt;
            switch (e.state()) {
                case WRITER_IDLE:
                    /**
                     *  利用写空闲发送心跳检测消息
                     */
                    ChatDto pingDto=new ChatDto();
                    pingDto.setMsg("我是心跳包");
                    ctx.writeAndFlush(JSON.toJSONString(pingDto));
                    log.info("send ping to server----------");
                    break;
                default:
                    break;
            }
        }
    }


    /**
     * 客户端接收到服务端发的数据
     * @param channelHandlerContext
     * @param obj
     * @throws Exception
     */
    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, Object obj)  {

        log.info(">>>>>>>>>>>>>客户端接收到消息:{}", obj);


        ReferenceCountUtil.release(obj);
    }
}

SprigBoot启动类添加服务端启动代码

@SpringBootApplication
public class NettyApplication {

    public static void main(String[] args) {
        SpringApplication.run(NettyApplication.class, args);


        try {
            NettyTcpServerBootstrap bootstrap = new NettyTcpServerBootstrap(9999);
            bootstrap.start();
        } catch (Exception e) {
            e.printStackTrace();
            System.out.println("server socket 启动失败");
        }
    }

}

ChatController.java

import com.alibaba.fastjson.JSON;
import com.example.netty.dto.ChatDto;
import com.example.netty.socket.NettyClientBootstrap;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.UUID;

/**
 * 客户端消息发送控制器
 */
@RestController
@Slf4j
public class ChatController {

    private static String clientId=UUID.randomUUID().toString();


    public static NettyClientBootstrap bootstrap;

    /**
     * 发送消息demo
     * @param msg
     */
    @PostMapping(value = "/send")
    public void send(String msg) {
        if (bootstrap == null) {
            try {
                /**
                 * 连接 输入服务器的端口和ip
                 */
                bootstrap = new NettyClientBootstrap(9999, "localhost");
            } catch (InterruptedException e) {
                e.printStackTrace();
                log.error(">>>>>>>>> server socket 连接失败");
            }
        }
        /**
         *   发送消息
         */
        ChatDto dto=new ChatDto();
        dto.setClientId(clientId).setMsg(msg);
        /**
         * json字符串发送
         */
        bootstrap.socketChannel.writeAndFlush(JSON.toJSONString(dto));

    }
}

访问

-----------------------有任何问题可以在评论区评论,也可以私信我,我看到的话会进行回复,欢迎大家指教------------------------ (蓝奏云官网有些地址失效了,需要把请求地址lanzous改成lanzoux才可以)
原文地址:https://www.cnblogs.com/pxblog/p/15502906.html