netty1 快速入门

Netty是一个高性能、异步事件驱动的网路通信框架 ,由于精力有限,本人并没有对其源 码做了特别细致的研究。如果下面的内容有错误或不严谨的地方,也请大家指正和谅解。
Netty的线程模型是Reactor主从模型的变种,去掉了线程池,使用串行化实现。
Reactor主从模型如下图所示

(mainReactor负责监听server socket,accept新连接;并将建立的socket分派给subReactor。

subReactor负责多路分离已连接的socket,读写网络数据,对业务处理功能,其扔给worker线程池完成。通常,subReactor个数上可与CPU个数等同)

Netty中Reactor模式的参与者主要有下面一些组件:

Selector
EventLoopGroup/EventLoop
ChannelPipeline


Selector即为NIO中提供的多路复用器,这里不再阐述。

EventLoopGroup/EventLoop

EventLoopGroup是一组EventLoop的抽象,在Netty服务器编程中我们需要BossEventLoopGroup和WorkerEventLoopGroup两个EventLoopGroup来进行工作。通常一个服务端口即一个ServerSocketChannel对应一个Selector和一个EventLoop线程,也就是说BossEventLoopGroup的线程数参数为1。BossEventLoop主要负责接收客户端的连接并

SocketChannel交给WorkerEventLoopGroup来进行处理。

ChannelPipeline

ChannelPipeline在Reactor模式中担任请求处理器的角色,通过调用相关的handler进行相对应的处理。

以下是第一个入门程序

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

public class Server {
	public static void main(String[] args) throws Exception {
		// 1 第一个线程组 是用于接收Client端连接的
		EventLoopGroup bossGroup = new NioEventLoopGroup();
		// 2 第二个线程组 是用于实际的业务处理操作的
		EventLoopGroup workerGroup = new NioEventLoopGroup();
		// 3 创建一个辅助类Bootstrap,就是对我们的Server进行一系列的配置
		ServerBootstrap bbootStarp = new ServerBootstrap();
		// 把俩个工作线程组加入进来
		bbootStarp.group(bossGroup, workerGroup)
				// 指定使用NioServerSocketChannel这种类型的通道
				.channel(NioServerSocketChannel.class)
				// 一定要使用 childHandler 去绑定具体的 事件处理器
				.childHandler(new ChannelInitializer<SocketChannel>() {
					@Override
					protected void initChannel(SocketChannel sc) throws Exception {
						sc.pipeline().addLast(new ServerHandler());
					}
				});
		// 绑定指定的端口 进行监听
		ChannelFuture channelFuture = bbootStarp.bind(8765).sync();

		// 让其不关闭 
		channelFuture.channel().closeFuture().sync();
		bossGroup.shutdownGracefully();
		workerGroup.shutdownGracefully();

	}
}

  

 

  

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;
 
public class ServerHandler extends ChannelInboundHandlerAdapter {
 
       //处理客户端发送过来的数据
	@Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        try {
        ByteBuf byteBuffer = (ByteBuf) msg;
        byte[] data = new byte[byteBuffer.readableBytes()];
        byteBuffer.readBytes(data);
        String sData = new String(data,"utf-8");
        System.out.println("server "+sData);
  
        //写操作完成以后就断开连接 
        ctx.writeAndFlush(Unpooled.copiedBuffer("888".getBytes())).addListener(ChannelFutureListener.CLOSE);
         
        } finally {
            ReferenceCountUtil.release(msg);
        }
    }
 
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
 
}

  

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

public class Client {

	public static void main(String[] args) throws Exception {
		
		EventLoopGroup workgroup = new NioEventLoopGroup();
	
		
		Bootstrap boot = new Bootstrap();
	
		boot.group(workgroup)
		.channel(NioSocketChannel.class)
		.handler(new ChannelInitializer<SocketChannel>() {
			@Override
			protected void initChannel(SocketChannel sc) throws Exception {
				sc.pipeline().addLast(new ClientHandler());
			}
		});
		
		ChannelFuture channelFuture = boot.connect("127.0.0.1", 8765).sync();
		
		//buf
		channelFuture.channel().writeAndFlush(Unpooled.copiedBuffer("777".getBytes()));
		
		channelFuture.channel().closeFuture().sync();
		workgroup.shutdownGracefully();
		
	}
}

  

 

  

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;
 
public class ClientHandler extends ChannelInboundHandlerAdapter {
 
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception  {
        try {
            // do something 
            ByteBuf buf = (ByteBuf) msg;
            byte[] data = new byte[buf.readableBytes()];
            buf.readBytes(data);
            String request = new String(data, "utf-8");
            System.out.println("Client: " + request);
 
        } finally {
            // 如果没有调用write或者writeAndFlush 需要手动释放msg
            ReferenceCountUtil.release(msg);
        }
    }
 
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

  

  可以看到,server 和client的创建连接或者发送请求的代码的都是相对固定的,实际开发中 我们专注写handler 就好。

原文地址:https://www.cnblogs.com/javabigdata/p/7444944.html