netty(一)

 https://www.cnblogs.com/imstudy/p/9908791.html

https://gitchat.csdn.net/activity/5d2fd4b4cfd4917a259d1a2a

线程组EventLoopGroup,监听客户端请求;处理客户端相关操作线程组,负责处理与客户端的请求操作。

netty对JDK自带的NIO的API进行了封装。   |     netty是异步高性能的通信框架。

netty的IO线程NioEventLoop由于聚合了多路复用器Selector,可以同时并发处理成百上千个客户端连接。当线程从某客户端Socket通道进行读写数据时,若没有数据可用时,该线程可以进行其他任务。

线程通常将非阻塞IO的空闲时间用于在其他通道上执行IO操作,所以单独的线程可以管理多个输入和输出通道。

 1、MainReactor负责客户端的连接,并将请求通过acceptor转交给SubReactor;

 2、SubReactor负责相应通道的IO读写请求;多个read请求会去线程池里面找线程进行系列操作:解码-处理-编码-发送数据,叫做worker threads;

3、说明:虽然netty的线程模型基于主从Reactor多线程,借用了MainReactor和SubReactor的结构,但是实际实现上SubReactor和Worker线程在同一个线程池中。

1 EventLoopGroup bossGroup = new NioEventLoopGroup();
2 EventLoopGroup workerGroup = new NioEventLoopGroup();
3 ServerBootstrap server = new ServerBootstrap();
4 server.group(bossGroup, workerGroup)
5   .channel(NioServerSocketChannel.class)

上面代码中的bossGroup和workerGroup是Bootstrap构造方法中传入的两个对象,这两个group均是线程池:1)、bossGroup线程池只是在bind某个端口后,获得其中一个线程作为MainReactor,专门处理端口的Accept事件,每个端口对应一个Boss线程;

           2)、workerGroup线程池会被各个SubReactor和Worker线程充分利用。

【异步操作】

1、netty中的IO操作是异步的,bind、write、connect操作会简单的返回一个ChannelFuture,调用者并不能立即获得结果,而是通过Future-Listener机制,用户可以方便的主动获取或者通过通知机制获得IO操作结果。

2、当Future对象刚刚创建时,处于非完成状态,调用者可以通过返回的ChannelFuture来获取操作执行的状态,注册监听函数来执行完成后的操作。

isDone 判断当前操作是否完成
isSuccess 判断已完成的当前操作是否成功
getCause 获取已完成的当前操作失败的原因
isCancelled 判断已完成的当前操作是否被取消
addListener 通过 addListener 方法来注册监听器,当操作已完成(isDone 方法返回完成),将会通知指定的监听器;如果 Future 对象已完成,则理解通知指定的监听器。
1 serverBootstrap.bind(port).addListener(future -> {
2        if(future.isSuccess()) {
3            System.out.println(newDate() + ": 端口["+ port + "]绑定成功!");
4        } else{
5            System.err.println("端口["+ port + "]绑定失败!");
6        }
7 });

【netty架构设计】

 【模块组件】

1、BootStrap和ServerBootStrap:

一个Netty应用通常由一个Bootstrap开始,主要作用是配置整个Netty程序,串联各个组件,Netty中Bootstrap类是客户端程序的启动引导类,serverBootstrap是服务端启动引导类。

2、Future、ChannelFuture

3、Channel

Netty网络通信的组件,能够用于执行网络IO操作。Channel为用户提供:

NioSocketChannel 异步的客户端 TCP Socket 连接
NioServerSocketChannel 异步的服务器端 TCP Socket 连接
NioDatagramChannel 异步的 UDP 连接
NioSctpChannel 异步的客户端 Sctp 连接
NioSctpServerChannel 异步的 Sctp 服务器端连接,这些通道涵盖了 UDP 和 TCP 网络 IO 以及文件 IO

4、selector

netty基于selector对象实现了IO多路复用,通过selector一个线程可以监听多个连接的Channel事件。

当向一个 Selector 中注册 Channel 后,Selector 内部的机制就可以自动不断地查询(Select) 这些注册的 Channel 是否有已就绪的 I/O 事件(例如可读,可写,网络连接完成等),这样程序就可以很简单地使用一个线程高效地管理多个 Channel 。

5、NioEventLoop

NioEventLoop中维护了一个线程和任务队列,支持异步提交执行任务,线程启动时会调用NioEventLoop的run方法,执行IO任务和非IO任务。

I/O 任务,即 selectionKey 中 ready 的事件,如 accept、connect、read、write 等,由 processSelectedKeys 方法触发。

非 IO 任务,添加到 taskQueue 中的任务,如 register0、bind0 等任务,由 runAllTasks 方法触发。

两种任务的执行时间比由变量 ioRatio 控制,默认为 50,则表示允许非 IO 任务执行的时间与 IO 任务的执行时间相等。

6、NioEventLoopGroup

NioEventLoopGroup,主要管理 eventLoop 的生命周期,可以理解为一个线程池,内部维护了一组线程,每个线程(NioEventLoop)负责处理多个 Channel 上的事件,而一个 Channel 只对应于一个线程。

7、ChannelHandler

ChannelHandler 是一个接口,处理 I/O 事件或拦截 I/O 操作,并将其转发到其 ChannelPipeline(业务处理链)中的下一个处理程序。

 8、ChannelHandlerContext

保存 Channel 相关的所有上下文信息,同时关联一个 ChannelHandler 对象。

9、ChannelPipline

保存 ChannelHandler 的 List,用于处理或拦截 Channel 的入站事件和出站操作。

ChannelPipeline 实现了一种高级形式的拦截过滤器模式,使用户可以完全控制事件的处理方式,以及 Channel 中各个的 ChannelHandler 如何相互交互。

在 Netty 中每个 Channel 都有且仅有一个 ChannelPipeline 与之对应,它们的组成关系如下:

一个Channel包含了一个ChannelPipeline,而ChannelPipeline中又维护了一个由ChannelHandlerContext组成的双向链表,并且每个ChannelHandlerContext中又关联着一个ChannelHandler。

入站事件和出站事件在一个双向链表中,入站事件会从链表head往后传递到最后一个入站的Handler,出站事件会从链表tail往前传递到最后一个出站的Handler,两种类型的Handler互不干扰。

 典型的初始化并启动netty服务端的代码过程如下:

 1 public static void main(String[] args) {
 2        // 创建mainReactor
 3        NioEventLoopGroup bossGroup = new NioEventLoopGroup();
 4        // 创建工作线程组
 5        NioEventLoopGroup workerGroup = new NioEventLoopGroup();
 6        final ServerBootstrap serverBootstrap = new ServerBootstrap();
 7        serverBootstrap
 8                 // 组装NioEventLoopGroup
 9 
10                .group(bossGroup, workerGroup)
11 
12                 // 设置channel类型为NIO类型
13 
14                .channel(NioServerSocketChannel.class)
15 
16                // 设置连接配置参数
17 
18                .option(ChannelOption.SO_BACKLOG, 1024)
19 
20                .childOption(ChannelOption.SO_KEEPALIVE, true)
21 
22                .childOption(ChannelOption.TCP_NODELAY, true)
23 
24                // 配置入站、出站事件handler    
27                .childHandler(newChannelInitializer<NioSocketChannel>() {
28                    @Override
29                    protectedvoidinitChannel(NioSocketChannel ch) {
30                        // 配置入站、出站事件channel
31                        ch.pipeline().addLast(...);
32                        ch.pipeline().addLast(...);
33                    }
34    });
35 
36  
37 
38        // 绑定端口
39        int port = 8080;
40        serverBootstrap.bind(port).addListener(future -> {
41            if(future.isSuccess()) {
42                System.out.println(newDate() + ": 端口["+ port + "]绑定成功!");
43 
44            } else{
45                System.err.println("端口["+ port + "]绑定失败!");
46            }
47        });
48 
49 }

 其中任务队列中的Task有3种典型的使用场景:

① 用户程序自定义的普通任务

1 ctx.channel().eventLoop().execute(new Runnable() {
2    @Override
3    public void run() {
4        //...
5    }
6 });

② 非当前Reactor线程调用Channel的各种方法

例如在推送系统的业务线程里面,根据用户的标识,找到对应的 Channel 引用,然后调用 Write 类方法向该用户推送消息,就会进入到这种场景。最终的 Write 会提交到任务队列中后被异步消费。

③ 用户自定义定时任务

1 ctx.channel().eventLoop().schedule(newRunnable() {
2    @Override
3    public void run() {
4  
5    }
7 }, 60, TimeUnit.SECONDS);
原文地址:https://www.cnblogs.com/lupj/p/13276644.html