netty源码学习—课堂4

这节课,我想要搞搞清楚,netty里底层read/write究竟是怎样一个模式。

  • read, how, from where, to where
  • write, how, from where, to where

SelectionKey

  • OP_READ=1
  • OP_WRITE=4
  • OP_CONNECT=8
  • OP_ACCEPT=16

Read

1. 如果channel有数据可读,那么相应的selectionKey的OP_READ将被置位。

2. 准备好buffer(从recvBufferPool中获取预估大小的缓存区),以备读出数据后放置;

3. 尝试着从channel中读取数据到buffer中,Reads a sequence of bytes from this channel into the given buffer.

An attempt is made to read up to r bytes from the channel, where r is the number of bytes remaining in the buffer, that is, dst.remaining(), at the moment this method is invoked.

Suppose that a byte sequence of length n is read, where 0 <= n <= r. This byte sequence will be transferred into the buffer so that the first byte in the sequence is at index p and the last byte is at index p + n - 1, where p is the buffer's position at the moment this method is invoked. Upon return the buffer's position will be equal to p + n; its limit will not have changed.

A read operation might not fill the buffer, and in fact it might not read any bytes at all. Whether or not it does so depends upon the nature and state of the channel. A socket channel in non-blocking mode, for example, cannot read any more bytes than are immediately available from the socket's input buffer; similarly, a file channel cannot read any more bytes than remain in the file. It is guaranteed, however, that if a channel is in blocking mode and there is at least one byte remaining in the buffer then this method will block until at least one byte is read.

This method may be invoked at any time. If another thread has already initiated a read operation upon this channel, however, then an invocation of this method will block until the first operation is complete. 

4. 生成一个新的buffer(大小为buffer.size()),将数据拷贝进去。释放老buffer空间,同时更新预估器。

5. 向上fireMessageReceived事件。

疑问:

a) 如果可读的数据有很多,超出buffer大小怎么办? 

最多读到buffer.limit()就满足了,不读了

b) 读到什么时候为止,channel上此时无数据为止?

如果buffer地方充裕,尽量把socket.inputBuffer里的东西全读出来,如果不够了,读满就溜。

c) 如果没数据,会hold住吗,还是等待一段时间?

如果没数据,亲,如果没数据,OP_READ怎么会被置位的,有点悖了,呵呵。如果真没数据,也是不会等的,回头。

d) 读的不一定是完整的消息体,怎么办,读了1.5个消息,那后面这0.5个要如何处理?

凑整一个消息,就反解出一个消息对象。

凑不整,那就累积着(cumulation),等下次read过来凑。

这个前后顺序与完整性,说到底是由底层os/tcp在控制,即一个消息紧接着一个消息过来,不会乱序或来一半。话说这要是真乱序或来一半,可怎么办?

FrameDecoder是Netty Protocol Decode最关键的Decoder,几乎所有和协议解码相关的Decoder都继承自它,那到底解决了什么问题?为什么需要这样的一个部件呢?TCP的传输是基于流的,每个数据包都有可能被分片和然后重组,这时候我们就需要协议去界定一个数据包,通常来说用来方式来确定数据包的边界,一个是基于长度,简单一点就是规定数据包的长度,例如规定每个数据包的长度为100byte,FixedlengthFrameDecoder就是这样,可实际中我们的数据包并不都是固定长度,可以说是大小不一的情况更常见。怎么样解决这个问题呢?我们加一个长度属性的header,标示我们实际内容的大小,这就是LengthFieldBasedFrameDecoder。

回归正题,为什么需要FrameDecoder,来考虑两个极端的问题:
1. 假设我们一帧(数据包)的大小为100byte,然而socket的sendBuffer的大小只有50byte,server端的readbuffer的大小也是50byte。这就意味着我们读两次才能读完成一帧。
2.Netty的NioWorker(Poller)是有读写控制,每次只能读到制定大小的buffer,例如一帧的大小还是100byte,Poller读到1024k,然后触发message receive事件去处理,这样第11帧只能读到24byte,显然不是一完整的帧剩余的76byte需要在下一次poll中接收。
通过这两个问题我们发现,有时候为了读完整的一帧需要累积读多次,就像以上两种情况需要两次,我们不能读到数据就进行处理,我们得需要一个累积器,把分散的buffer累积到累积器中,再进行解码,这样就能避免一帧跨两个buffer。
if (cumulation == null) {
            // the cumulation buffer is not created yet so just pass the input to callDecode(...) method 第一次的时候,累积器还没有创建,这时先解码,然后把剩余的部分加到累积器中。如果每次input都变完整的解码成一帧,没有/剩余则不会创建累积器
            callDecode(ctx, e.getChannel(), input, e.getRemoteAddress());
   //如果有剩余放入累积
            if (input.readable()) {
                // seems like there is something readable left in the input buffer. So create the cumulation buffer and copy the input into it
                (this.cumulation = newCumulationBuffer(ctx, input.readableBytes())).writeBytes(input);
            }
        } else {
            assert cumulation.readable();
            boolean fit = false;
            
            int readable = input.readableBytes();
            int writable = cumulation.writableBytes();
            int w = writable - readable;
            if (w < 0) {
                int readerIndex = cumulation.readerIndex();
                if (w + readerIndex >= 0) {
                    // the input will fit if we discard all read bytes, so do it
                    cumulation.discardReadBytes();
                    fit = true;
                }

一个简单netty client pojo通讯实现,完整的encoder/decoder实例:http://guoc.iteye.com/blog/898394

 
Write
1. 非nio线程,不可以直接写socket。需要放到Queue<WriteTask>,待nioWorker来调度执行。换句话说,只有nioWorker有直接向socket写的权利。
2. nio线程,需要写socket,也不是"想写就能立马写出去的",要在xxx里候着,待socket可写后(OP_WRITExxx),才能逐个的往里写。
3. 且看如何向socket写的:
  • 获得写锁 synchronized(channel.writeLock);
  • 从writeBufferQueue中取出写事件;
  • 总得找个地方放消息buf(从sendBufferPool里开辟事件大小的缓冲区);
  • 将buf中的数据往channel里写,buf.transferTo(ch);
  • 如果写完了,那么释放buf,将事件的future属性置为true。如果没写完,那么可能kernel buffer当时是满的,那么需要做一些动作,标记"我没发完,以及我发到哪儿了",有待channel恢复后继续发。
  • 向上fireWriteComplete事件;
 
疑问:
a) 一次没写完,如何标识?
setOpWrite()。如果channel对写不感兴趣,那么要重置为对写感兴趣。这样,如果channel可写了,或遭遇对端关闭等一系列问题的时候,selectionKey能感受的到并被挑出来接受相应处理。
b) 一次没写完,下次再发的时候,怎么知道从"哪个位置"开始继续发?
channel.currentWriteBuffer里会记录上次没发完的东西
future.setProgress() 这个动作我看貌似没什么实际意义呀
 
 
 
原文地址:https://www.cnblogs.com/alipayhutu/p/2781595.html