JAVA Netty入门Demo实例代码(自写测试可用)实现客户端服务器端互传数据

首先创建MAVEN项目

pom.xml 写入

1         <!-- netty -->
2         <dependency>
3             <groupId>io.netty</groupId>
4             <artifactId>netty-all</artifactId>
5             <version>4.1.36.Final</version>
6         </dependency>

服务器端

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;

public class EchoServer {

    private final int port;

              public EchoServer(int port) {
                  this.port = port;
              }
    public void start() throws Exception {
        EventLoopGroup group = new NioEventLoopGroup();
                 try {
                         ServerBootstrap sb = new ServerBootstrap();
                         sb.group(group) // 绑定线程池
                                .channel(NioServerSocketChannel.class) // 指定使用的channel
                                .localAddress(this.port)// 绑定监听端口
                                 .childHandler(new ChannelInitializer<SocketChannel>() { // 绑定客户端连接时候触发操作

                                           @Override
                                  protected void initChannel(SocketChannel ch) throws Exception {
                                                          System.out.println("connected...; Client:" + ch.remoteAddress());
//                                               ByteBuf byteBuf= Unpooled.copiedBuffer("$".getBytes());//防止粘包处理在消息末尾使用换行符对消息进行分割,或者使用其他特殊字符来对消息进行分割;
//                                               ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,byteBuf));//防止粘包处理在消息末尾使用换行符对消息进行分割,或者使用其他特殊字符来对消息进行分割;
                                                          ch.pipeline().addLast(new EchoServerHandler()); // 客户端触发操作
                                                      }
                              });
                          ChannelFuture cf = sb.bind().sync(); // 服务器异步创建绑定
                          System.out.println(EchoServer.class + " started and listen on " + cf.channel().localAddress());
                          cf.channel().closeFuture().sync(); // 关闭服务器通道
                      } finally {
                          group.shutdownGracefully().sync(); // 释放线程池资源
                      }
              }

              public static void main(String[] args) throws Exception {
                  new EchoServer(65535).start(); // 启动
              }
    }

  服务器端操作

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;

import java.util.Date;

public class EchoServerHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //        数据读取事件
//        System.out.println("我是读取事件" + msg.toString());
//        ctx.write(msg);
        try {
            ByteBuf buf = (ByteBuf)msg;
//创建目标大小的数组
            byte[] barray = new byte[buf.readableBytes()];
//把数据从bytebuf转移到byte[]
            buf.getBytes(0,barray);
            //将byte[]转成字符串用于打印
            String str=new String(barray);

            if (str.length()>0)
            {
                System.out.println(str);
                System.out.println("收到消息回复一条消息给客户端");
                System.out.println("client channelActive..");
                ctx.writeAndFlush(Unpooled.copiedBuffer("服务器端发一条数据给客户端"+new Date().toString(), CharsetUtil.UTF_8)); // 必须有flush
                System.out.flush();

            }
            else
            {
                System.out.println("不能读啊");
            }
            buf.release();
        }finally {
//buf.release();
        }
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        System.out.println("server channelReadComplete..");
        // 第一种方法:写一个空的buf,并刷新写出区域。完成后关闭sock channel连接。
        ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
        //ctx.flush(); // 第二种方法:在client端关闭channel连接,这样的话,会触发两次channelReadComplete方法。
        //ctx.flush().close().sync(); // 第三种:改成这种写法也可以,但是这中写法,没有第一种方法的好。
    }
//    @Override
//    public void channelActive(ChannelHandlerContext ctx) throws Exception {
//        System.out.println("client channelActive..");
//        ctx.writeAndFlush(Unpooled.copiedBuffer("服务器端发一条数据给客户端", CharsetUtil.UTF_8)); // 必须有flush
//
//    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("server occur exception:" + cause.getMessage());
        cause.printStackTrace();
        ctx.close(); // 关闭发生异常的连接
    }
}

  

客户端

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;

import java.net.InetSocketAddress;

public class EchoClient {
    private final String host;
    private final int port;

    public EchoClient() {
        this(0);
    }

    public EchoClient(int port) {
        this("localhost", port);
    }

    public EchoClient(String host, int port) {
        this.host = host;
        this.port = port;
    }

    public void start() throws Exception {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group) // 注册线程池
                    .channel(NioSocketChannel.class) // 使用NioSocketChannel来作为连接用的channel类
                    .remoteAddress(new InetSocketAddress(this.host, this.port)) // 绑定连接端口和host信息
                    .handler(new ChannelInitializer<SocketChannel>() { // 绑定连接初始化器
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            System.out.println("连接connected...");
//                            ByteBuf byteBuf= Unpooled.copiedBuffer("$".getBytes());//防止粘包处理在消息末尾使用换行符对消息进行分割,或者使用其他特殊字符来对消息进行分割;
//                            ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,byteBuf));//防止粘包处理在消息末尾使用换行符对消息进行分割,或者使用其他特殊字符来对消息进行分割;
                            ch.pipeline().addLast(new EchoClientHandler());
                        }
                    });
            System.out.println("created..");

            ChannelFuture cf = b.connect().sync(); // 异步连接服务器
            System.out.println("connected..."); // 连接完成

            cf.channel().closeFuture().sync(); // 异步等待关闭连接channel
            System.out.println("closed.."); // 关闭完成
        } finally {
            group.shutdownGracefully().sync(); // 释放线程池资源
        }
    }

    public static void main(String[] args) throws Exception {
     new EchoClient("127.0.0.1", 65535).start(); // 连接127.0.0.1/65535,并启动
        System.out.println("===================================");
//        new EchoClient("127.0.0.1", 65535).start();
    }
}

  客户端操作

import java.nio.charset.Charset;
import java.util.Date;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;

public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("client channelActive..");
        ctx.writeAndFlush(Unpooled.copiedBuffer("客户端发送一条新数据给你"+new Date().toString(), CharsetUtil.UTF_8)); // 必须有flush

        // 必须存在flush
        // ctx.write(Unpooled.copiedBuffer("Netty rocks!", CharsetUtil.UTF_8));
//         ctx.flush();
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
        System.out.println("读取数据..");
        ByteBuf buf = msg.readBytes(msg.readableBytes());
        System.out.println("Client received:" +buf.toString(Charset.forName("utf-8")));
        //ctx.channel().close().sync();// client关闭channel连接
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }

}

服务器接收发送数据:


客户端接收发送数据

原文地址:https://www.cnblogs.com/wyq-study/p/14637952.html