Netty(3)Time protocol

本节介绍TIME协议。该协议与前边的discard协议和echo协议的不同点在于:
1、服务端主动发送消息给到客户端,所以需要channelActive()方法。
2、发送的消息是4个字节的int
3、不接收来自客户端的任何请求,所以不需要channelRead()方法。
4、一旦消息发送完毕,就关闭该connection。

一、server端

TimeServerHandler.java

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class TimeServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {//(1)
        final ByteBuf time = ctx.alloc().buffer(4);//(2)
        time.writeInt((int)(System.currentTimeMillis()/1000L+2208988800L));
        log.info("{}",(int)(System.currentTimeMillis()/1000L+2208988800L));
        final ChannelFuture f = ctx.writeAndFlush(time);//(3)
        f.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                assert f == future;
                ctx.close();
            }
        });//(4)
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
    
}

1、当客户端与服务端建立了connection时,立即触发服务端的channelActive()方法。该方法内写一个4个字节的int,用于表示当前时间。
2、分配一个4字节的buffer,来存储int。采用ChannelHandlerContext.alloc()获取当前的ByteBufAllocator,来分配一个新的buffer。
3、先说下,java.nio.flip():调用flip()以后,读/写指针position知道缓冲区头部,并设置了最多能读出的已写入数据长度(不是buffer的capacity)。
那这里的NIO,为啥在发送消息之前,不调用flip()呢?因为netty中的ByteBuf没有该方法。为啥没有?因为,ByteBuf有2个指针;一个是read另一个是write。当你调用buffer.writeXXX(...)时,write index会增加,而read index不变。反之,当你buffer.readXXX()时,read index会增加,而write index不变。read index代表buffer的当前未读的位置(没read过,就是0,read了1个字节就是1),而write index代表以后写入的位置(可读字节的结束位置)。
另外,在注意一点,ChannelHandlerContext.write(和writeAndFlush())方法返回ChannelFuture。该ChannelFuture代表着异步,意味着write或writeAndFlush不会立即执行。例如:下边的代码可能会先执行close()然后在执行writeAndFlush

Channel ch = ...;
ch.writeAndFlush(message);
ch.close();

因此,应该在ChannelFuture完成后,写close()方法。当write完成后会ChannelFuture会通知其listeners。


4、那我们该怎么写呢?很简单,为该返回的ChannelFuture增加一个ChannelFutureListener。

f.addListener(new ChannelFutureListener() {
    @Override
    public void operationComplete(ChannelFuture future) throws Exception {
        assert f == future;
        ctx.close();
    }
});

这里,我们创建了一个匿名的ChannelFutureListener来关闭该Channel,当write完成后。另外一种方法是,可简化写成预定义的:

f.addListener(ChannelFutureListener.CLOSE);

TimeServer.java,可以拷贝之前的DiscardServer.java,将添加的handler改成TimeServerHandler即可。

不管用哪种方法,close()方法可能不会立即关闭connection,他会返回一个ChannelFuture

二、客户端

因为int类型的时间戳,看不懂,只能借助程序翻译,因此,不像DISCARD和ECHO server可以不需要client。本节介绍编写client。client与server的最大且唯一的不同在于Bootstrap和Channel的实现类不同。
server:使用ServerBootstrap、NioServerSocketChannel
client:使用Bootstrap、NioSocketChannel
发现:server中的都带server,client中均去掉了server。

TimeClient.java

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class TimeClient {
    public static void main(String[] args) throws Exception {
        String host = "localhost";
        int port = 8080;
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();//(1)
            b.group(workerGroup);//(2)
            b.channel(NioSocketChannel.class);//(3)
            b.option(ChannelOption.SO_KEEPALIVE, true);//(4)
            b.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new TimeClientHandler());
                }
            });
            //start the client.
            ChannelFuture f = b.connect(host,port).sync();//(5)
            //wait until the connection is closed.
            f.channel().closeFuture().sync();
            log.info("client channel is closed.");
        } finally {
            workerGroup.shutdownGracefully();
        }
    }
    
}

1、Bootstrap类似ServerBootstrap,但其只能应用于客户端或者无连接通信下的channel,例如:UDP,可实现在预先不建立连接的情况下,实现广播。
2、如果只声明了一个EventLoopGroup,那么该group既作为boss group又作为worker group。
3、客户端使用NioSocketChannel,服务端使用NioServerSocketChannel。
4、客户端中没有childOption()。
5、客户端中使用connect()而不是使用bind()方法。

如你所见,客户端与服务端的启动代码并无多大区别。下边写handler,来接收4字节整数,翻译为可读格式的日期,最后关闭connection。

TimeClientHandler.java

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.extern.slf4j.Slf4j;
import cn.hutool.core.date.DateUtil;
@Slf4j
public class TimeClientHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf m = (ByteBuf) msg;
        try {
            long currentTimeMillis = (m.readUnsignedInt() - 2208988800L)*1000L;
            log.info("{}",DateUtil.date(currentTimeMillis));
            ctx.close();
        } finally {
            m.release();//(1)
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }

}

1、因为不用了(不传递给其他handler,因为没有),所以释放掉。

测试结果:
客户端:

22:09:03.324 [nioEventLoopGroup-2-1] 2018-09-12 22:09:03
22:09:03.437 [main] client channel is closed.

服务端:

22:09:03.276 [nioEventLoopGroup-3-1] -549217153

说明:本例有时会抛出IndexOutOfBoundsException异常,将在下一节讨论。

原文地址:https://www.cnblogs.com/yaoyuan2/p/9630965.html