真正实现Netty私有协议开发

      首先《Netty权威指南》私有协议开发那一章的样例代码是编译不通过的(但是这丝毫不影响本书的价值)
处理方案可以参考:http://www.itnose.net/detail/6112870.html

      另外这一章的私有协议开发案例也过于理想化,为什么这么说呢?如果说服务端或客户端有一方基于历史原因是用其他语言实现的呢,比如C,Java和C的int类型字节长度是不一样的
那LineBasedFrameDecoder就用不上了,让我们看一个实际的私有协议案例:

这个协议在C/++程序员看来是个再正常不过的了,尤其注意协议对长度字段是采用字符串方式描述的(最多支持9999),如果用Netty来实现,又该如何处理呢?

public class NettyMessage {
    
    private Header header;
    private Object body;
    
    //检验和
    private byte crcCode;
    
    public byte getCrcCode() {
        return crcCode;
    }
    public void setCrcCode(byte crcCode) {
        this.crcCode = crcCode;
    }
    public Header getHeader() {
        return header;
    }
    public void setHeader(Header header) {
        this.header = header;
    }
    public Object getBody() {
        return body;
    }
    public void setBody(Object body) {
        this.body = body;
    }
    
    public String toString(){
        return ToStringBuilder.reflectionToString(this);
    }
}
public class Header {
    
    //固定头
    private byte startTag;
    
    //命令码,4位
    private byte[] cmdCode;
    
    //版本 2位
    private byte[] version;

    private int length;
    
    public byte[] getVersion() {
        return version;
    }

    public void setVersion(byte[] version) {
        this.version = version;
    }

    public byte[] getCmdCode() {
        return cmdCode;
    }

    public void setCmdCode(byte[] cmdCode) {
        this.cmdCode = cmdCode;
    }

    public byte getStartTag() {
        return startTag;
    }

    public void setStartTag(byte startTag) {
        this.startTag = startTag;
    }

    public int getLength() {
        return length;
    }

    public void setLength(int length) {
        this.length = length;
    }
    
    public String toString(){
        return ToStringBuilder.reflectionToString(this);
    }
}
public class MessageEncoder extends MessageToByteEncoder<NettyMessage>{

	@Override
	protected void encode(ChannelHandlerContext ctx, NettyMessage msg, ByteBuf out) throws Exception {
		try{
			if(msg == null || msg.getHeader() == null){
				throw new Exception("The encode message is null");
			}
			
			out.writeByte(msg.getHeader().getStartTag());
			out.writeBytes(msg.getHeader().getCmdCode());
			
			//占位
			byte[] lengthBytes = new byte[]{0, 0, 0, 0};
			out.writeBytes(lengthBytes);
			
			out.writeBytes(msg.getHeader().getVersion());
			String body = (String)msg.getBody();
			int length = 0;
			if(body != null){
				byte[] bodyBytes = body.getBytes();
				out.writeBytes(bodyBytes);
				length = bodyBytes.length;
				
				if(Constants.CRCCODE_DEFAULT != msg.getCrcCode()){
					msg.setCrcCode(CRC8.calcCrc8(bodyBytes));
				}
			}
			
			//长度从int转换为byte[4]
			byte l1 = getIndexToByte(length, 3);
			byte l2 = getIndexToByte(length, 2);
			byte l3 = getIndexToByte(length, 1);
			byte l4 = getIndexToByte(length, 0);
			lengthBytes = new byte[]{l1, l2, l3, l4};
			out.setBytes(5, lengthBytes);
			
			out.writeByte(msg.getCrcCode());
		}catch(Exception e){
			e.printStackTrace();
			throw e;
		}
	}
	
	public static byte getIndexToByte(int i, int index){
		if(index == 0){
			return (byte)(i % 10);
		}else{
			int num = (int)Math.pow(10, index);
			return (byte)((i / num) % 10);
		}
	}

}

  

public class MessageDecoder extends ByteToMessageDecoder {

    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        try{
            if(in.readableBytes() < 12){
                return;
            }
            in.markReaderIndex();    
            NettyMessage message = new NettyMessage();
            Header header = new Header();
            header.setStartTag(in.readByte());
            
            byte[] cmdCode = new byte[4];
            in.readBytes(cmdCode);
            header.setCmdCode(cmdCode);
            
            //长度从byte[4]转int
            byte[] lengthBytes = new byte[4];
            in.readBytes(lengthBytes);
            int length = toInt(lengthBytes);
            header.setLength(length);
            if(length < 0 || length > 10240){//过长消息或不合法消息
                throw new IllegalArgumentException("wrong message length");
            }
            
            byte[] version = new byte[2];
            in.readBytes(version);
            header.setVersion(version);
            
            if(header.getLength() > 0){
                if(in.readableBytes() < length + 1){
                    in.resetReaderIndex();
                    return;
                }
                byte[] bodyBytes = new byte[header.getLength()];
                in.readBytes(bodyBytes);
                message.setBody(new String(bodyBytes));
            }
            message.setCrcCode(in.readByte());
            message.setHeader(header);
            out.add(message);
        }catch(Exception e){
            e.printStackTrace();
            throw e;
        } 
    }

    public static int toInt(byte[] bytes){
        int value = 0;
        for(int i=0; i<bytes.length; i++){
            int num = (int)Math.pow(10, bytes.length - 1 - i);
            value += num * bytes[i];
        }
        return value;
    }
}

服务端代码:

public class NettyServer implements Runnable{
    
    private static final Logger logger = LoggerFactory.getLogger(NettyServer.class);
    
    @Autowired
    Config config;
    
    public void bind(int port) throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 1024)
                .childHandler(new ChildChannelHandler());
            ChannelFuture f = b.bind(port).sync();
            logger.info("Push server started on port " + port);
            f.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    public static class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            ch.pipeline()
            .addLast(new MessageDecoder())
            .addLast(new MessageEncoder())
        }
    }

    @Override
    public void run() {
        try {
            this.bind(config.getServerPort());
        } catch (Exception e) {
            logger.error(e.getMessage());
            System.exit(1);
        }
    }
}

客户端代码:

/**
 * 客户端
 * @author peng
 */
public class NettyClient {
    
    public void connect(String remoteServer, int port) throws Exception {
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(workerGroup)
                    .channel(NioSocketChannel.class)
                    .handler(new ChildChannelHandler());
            ChannelFuture f = b.connect(remoteServer,port).sync();
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
        }
    }
    
    public static class ChildChannelHandler extends
            ChannelInitializer<SocketChannel> {

        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            ch.pipeline().addLast(new MessageDecoder())
            .addLast(new MessageEncoder())
        }
    }
    
    public static void main(String[] args){
        try {
            new NettyClient().connect("127.0.0.1", 9080);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

总结:关键在于Encoder和Decoder的编码实现

欢迎转载,转载请务必注明出处
原文地址:https://www.cnblogs.com/mzsg/p/5442527.html