netty 入门示例

  server 服务端

  入口

package com.sxmd.gateway.server;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;

/**
 * Description: 服务端
 *
 * @author cy
 * @date 2019年09月03日 8:51
 * Version 1.0
 */
public class ServerMain {

    public static void main(String[] args){
        ServerMain.bind(8754);
    }

    public static void bind(int port) {
        EventLoopGroup boss = new NioEventLoopGroup();
        EventLoopGroup worker = new NioEventLoopGroup();
        try {
            ServerBootstrap boot = new ServerBootstrap();
            boot.group(boss,worker)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 100)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .childHandler(new ServerPipeLine());
            ChannelFuture f = null;

                f = boot.bind(port).sync();

            System.out.println("服务端开始监听,等待客户端连接");
            f.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            boss.shutdownGracefully();
            worker.shutdownGracefully();
        }
    }

}
package com.sxmd.gateway.server;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;


/**
 * Description:
 *
 * @author cy
 * @date 2019年09月03日 9:10
 * Version 1.0
 */
public class ServerPipeLine extends ChannelInitializer<SocketChannel> {

    @Override
    protected void initChannel(SocketChannel channel) throws Exception {
        //以$为分隔符
        ByteBuf buf = Unpooled.copiedBuffer("$".getBytes());
        // 解决粘包的问题
        ChannelPipeline pipeline = channel.pipeline();
        pipeline.addLast(new DelimiterBasedFrameDecoder(2018,buf));
        // pipeline.addLast(new StringDecoder());
        pipeline.addLast(new ServerInHandler());
    }
}
package com.sxmd.gateway.server;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

import java.net.SocketAddress;

/**
 * Description:
 *
 * @author cy
 * @date 2019年09月03日 9:14
 * Version 1.0
 */
public class ServerInHandler extends SimpleChannelInboundHandler<Object> {

    @Override
    protected void channelRead0(ChannelHandlerContext cxf, Object msg) throws Exception {
        ByteBuf buf = (ByteBuf) msg;
        byte[] reg = new byte[buf.readableBytes()];
        buf.readBytes(reg);
        String body = new String(reg, "UTF-8");
        System.out.println(Thread.currentThread().getName() + "服务端收到的消息:" + body);
        // 回复消息
        String respMsg = "你好," + body + ",我收到了你的消息$";
        ByteBuf byteBuf = Unpooled.copiedBuffer(respMsg.getBytes());
        cxf.writeAndFlush(byteBuf);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        /**flush:将消息发送队列中的消息写入到 SocketChannel 中发送给对方,为了频繁的唤醒 Selector 进行消息发送
         * Netty 的 write 方法并不直接将消息写如 SocketChannel 中,调用 write 只是把待发送的消息放到发送缓存数组中,再通过调用 flush
         * 方法,将发送缓冲区的消息全部写入到 SocketChannel 中
         * */
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        /**当发生异常时,关闭 ChannelHandlerContext,释放和它相关联的句柄等资源 */
        cause.printStackTrace();
        ctx.close();
    }

}

客户端 client

package com.sxmd.gateway.client;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoop;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

/**
 * Description: 客户端
 *
 * @author cy
 * @date 2019年09月03日 8:51
 * Version 1.0
 */
public class ClientMain {

    public static void main(String[] args) {
        for (int i = 0; i < 1; i++) {
            new Thread(new ClientThread()).start();
        }
    }

    static class ClientThread implements Runnable{
        @Override
        public void run() {
            connect("192.168.141.124",8754);
        }
        public void connect(String host,int port){
            EventLoopGroup group = new NioEventLoopGroup();
            try {
                Bootstrap b = new Bootstrap();
                b.group(group)
                        .channel(NioSocketChannel.class)
                        .option(ChannelOption.TCP_NODELAY, true)
                        .handler(new ClientPipeLine());
                ChannelFuture sync = b.connect(host, port).sync();
                System.out.println(Thread.currentThread().getName() + ",客户端发起异步连接..........");
                /**等待客户端链路关闭*/
                sync.channel().closeFuture().sync();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }finally {
                group.shutdownGracefully();
            }
        }

    }

}
package com.sxmd.gateway.client;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;

/**
 * Description:
 *
 * @author cy
 * @date 2019年09月03日 9:35
 * Version 1.0
 */
public class ClientPipeLine extends ChannelInitializer<SocketChannel> {

    @Override
    protected void initChannel(SocketChannel channel) throws Exception {
        //以$为分隔符
        ByteBuf buf = Unpooled.copiedBuffer("$".getBytes());
        ChannelPipeline pipeline = channel.pipeline();
        // 防止粘包的问题
        pipeline.addLast(new DelimiterBasedFrameDecoder(2018,buf));
        // pipeline.addLast(new StringDecoder());
        pipeline.addLast(new ClientInHandler());
    }
}
package com.sxmd.gateway.client;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

/**
 * Description:
 *
 * @author cy
 * @date 2019年09月03日 9:39
 * Version 1.0
 */
public class ClientInHandler extends SimpleChannelInboundHandler<Object> {


    /**
     * 当客户端和服务端 TCP 链路建立成功之后,Netty 的 NIO 线程会调用 channelActive 方法
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("客户连接服务端成功");
        for (int i = 0; i < 10; i++) {
            String reqMsg = "我是客户端"+i+"$";
            ByteBuf reqByteBuf = Unpooled.copiedBuffer(reqMsg.getBytes());
            /**
             * writeBytes:将指定的源数组的数据传输到缓冲区
             * 调用 ChannelHandlerContext 的 writeAndFlush 方法将消息发送给服务器
             */
            ctx.writeAndFlush(reqByteBuf);
        }
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf = (ByteBuf) msg;
        byte[] req = new byte[buf.readableBytes()];
        buf.readBytes(req);
        String body = new String(req, "UTF-8");
        System.out.println(Thread.currentThread().getName() + "服务端返回:" + body);
    }

    /**
     * 当发生异常时,打印异常 日志,释放客户端资源
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        /**释放资源*/
        cause.printStackTrace();
        ctx.close();
    }

}
原文地址:https://www.cnblogs.com/chengyangyang/p/11454286.html