Netty的Marshalling编解码器

1、编码与解码
    通常我们习惯将编码(Encode)称为序列化(serialization),它将对象序列化为字节数组,用于网络传输、数据持久化或者其它用途。反之,解码(Decode)称为反序列化(deserialization),它把从网络、磁盘等读取的字节数组还原成原始对象(通常是原始对象的拷贝),以方便后续的业务逻辑操作。

2、java序列化
        Java默认提供的序列化机制,需要序列化的Java对象只需要实现java.io.Serializable接口并生成序列化ID,这个类就能够通过java.io.ObjectInput和java.io.ObjectOutput序列化和反序列化。可以直接把Java对象作为可存储的字节数组写入文件,也可以传输到网络上。 

     2.1 java序列化的缺点      
        无法跨语言。

        序列化后码流太大。

        序列化性能太低。

3、其他编解码框架
        Java默认的序列化机制效率很低、序列化后的码流也较大,所以涌现出了非常多的优秀的Java序列化框架,例如:Protobuf(google)、Thrift(facebook)、JBoss Marshalling(JBoss)等等。  
    3.1 Protobuf(google)
        Protobuf是google开源的项目,全称Google Protocol Buffers.特点:

            1.结构化数据存储格式(xml,json等)

            2.高性能编解码技术

            3.语言和平台无关,扩展性好

            4.支持java,C++,Python三种语言。

    3.2 JBoss Marshalling(JBoss)
        JBoss Marshalling是一个java对象的序列化API包,修正了java自带的序列化包的很多问题,但又保持跟java.io.Serializable接口的兼容,

        同时又增加了一些可调的参数和附加特性,并且这些参数和特性可通过工厂类的配置。特点:

            1.可拔插的类解析器,提供更加便捷的类加载定制策略,通过一个接口即可实现定制。

            2.可拔插的对象替换技术,不需要通过继承的方式。

            3.可拔插的预定义类缓存表,可以减少序列化的字节数组长度,提升常用类型的对象序列化性能。

            4.无须实现java.io.Serializable接口

            5.通过缓存技术提升对象的序列化性能。

            6.使用非常简单

            代码实现

                类库:jboss-marshalling-1.3.0.CR9.jar jboss-marshalling-serial-1.3.0.CR9.jar

                下载地址:https://www.jboss.org/jbossmarshalling/downloads

                我所用的jar 包为 https://pan.baidu.com/s/1tm2EgYtDpTS5dejVnHhvPw 密码:vc8b 

1、客户端实体类

package com.xyq.netty.serial;

import java.io.Serializable;

public class Request implements Serializable{

private static final long serialVersionUID = 1L;

private int id;
private String name;
private String requestMessage;

public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getRequestMessage() {
return requestMessage;
}
public void setRequestMessage(String requestMessage) {
this.requestMessage = requestMessage;
}

}

2、服务器端实体类
package com.xyq.netty.serial;

import java.io.Serializable;

public class Response implements Serializable{

private static final long serialVersionUID = 1L;

private int id;
private String name;
private String responseMessage;

public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getResponseMessage() {
return responseMessage;
}
public void setResponseMessage(String responseMessage) {
this.responseMessage = responseMessage;
}

}

3、JBoss marshalling编解码工具类
package com.xyq.netty.serial;

import io.netty.handler.codec.marshalling.DefaultMarshallerProvider;
import io.netty.handler.codec.marshalling.DefaultUnmarshallerProvider;
import io.netty.handler.codec.marshalling.MarshallerProvider;
import io.netty.handler.codec.marshalling.MarshallingDecoder;
import io.netty.handler.codec.marshalling.MarshallingEncoder;
import io.netty.handler.codec.marshalling.UnmarshallerProvider;

import org.jboss.marshalling.MarshallerFactory;
import org.jboss.marshalling.Marshalling;
import org.jboss.marshalling.MarshallingConfiguration;

public class MarshallingCodeCFactory {

public static MarshallingDecoder buliteMarshallingDecoder(){

//1、首先通过编组工具类的精通方法获取编组实例对象参数序列标识创建的是java序列化工厂对象。
final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
//2、创建了MarshallingConfiguration对象,配置了版本号为5
final MarshallingConfiguration configuration = new MarshallingConfiguration();
configuration.setVersion(5);
//3、根据marshallerFactory和配置创建提供商
UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration);
//4、构建Netty的MarshallingDecoder对象,两个参数分别为提供商和单个消息序列化后的最大长度
MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024*1024*1);
return decoder;
}

public static MarshallingEncoder buliteMarshallingEncoder(){
final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
final MarshallingConfiguration configuration = new MarshallingConfiguration();
configuration.setVersion(5);
MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration);
//5、构建Netty的MarshallingEncoder对象,MarshallingEncoder用于实现序列化接口的POJO对象序列化为二进制数组
MarshallingEncoder encoder = new MarshallingEncoder(provider);
return encoder;
}

}

4、服务器端

package com.xyq.netty.serial;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

public class Server {

public static void main(String[] args) throws Exception{
//1、定义两个线程组
EventLoopGroup pGroup = new NioEventLoopGroup();//一个是用于处理服务器端接收客户端连接的
EventLoopGroup cGroup = new NioEventLoopGroup();//一个是进行网络通信的(网络读写的)
try {
//2、创建辅助工具类,用于服务器通道的一系列配置
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(pGroup, cGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.option(ChannelOption.SO_SNDBUF, 32*1024)
.option(ChannelOption.SO_RCVBUF, 32*1024)
.childHandler(new ChannelInitializer<SocketChannel>() {

@Override
protected void initChannel(SocketChannel ch) throws Exception {

//设置marshalling编码
ch.pipeline().addLast(MarshallingCodeCFactory.buliteMarshallingDecoder());
//设置marshalling解码
ch.pipeline().addLast(MarshallingCodeCFactory.buliteMarshallingEncoder());
//处理业务
ch.pipeline().addLast(new ServerHandler());
}
});

//4、进行绑定
ChannelFuture cf = bootstrap.bind(8765).sync();
System.out.println("server start ...");

//5、等待关闭
cf.channel().closeFuture().sync();
} finally{
pGroup.shutdownGracefully();
cGroup.shutdownGracefully();
}

}

}

5、服务器处理类

package com.xyq.netty.serial;

import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

public class ServerHandler extends ChannelHandlerAdapter{

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)throws Exception {

//读取客户端发过来的信息
Request request = (Request) msg;
System.out.println("服务器接收到客户端的信息为 " + request.getId() + " " + request.getName());

//给客户端回应信息
Response response = new Response();
response.setId(request.getId());
response.setName(request.getName()+"res");
ctx.writeAndFlush(response);
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)throws Exception {
ctx.close();
}

}

6、客户端

package com.xyq.netty.serial;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

public class Client {

public static void main(String[] args) throws Exception{
//创建线程组
EventLoopGroup group = new NioEventLoopGroup();
try {
//创建工具类
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {

@Override
protected void initChannel(SocketChannel ch) throws Exception {

//设置marshalling解码
ch.pipeline().addLast(MarshallingCodeCFactory.buliteMarshallingDecoder());
//设置marshalling编码
ch.pipeline().addLast(MarshallingCodeCFactory.buliteMarshallingEncoder());

ch.pipeline().addLast(new ClientHandler());
}
});

//4、建立连接
ChannelFuture cf = bootstrap.connect("127.0.0.1", 8765).sync();
System.out.println("Client connet ....");

//5、发送信息
for(int i=0; i<5; i++){
Request request = new Request();
request.setId(i);
request.setName("request" + i);
cf.channel().writeAndFlush(request);
}

//6、等待关闭
cf.channel().closeFuture().sync();
} finally{
group.shutdownGracefully();
}
}

}

7、客户端处理类

package com.xyq.netty.serial;

import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.ReferenceCountUtil;

public class ClientHandler extends ChannelHandlerAdapter {

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)throws Exception {

try {
//接收服务端的响应信息
Response response = (Response) msg;
System.out.println("客户端接收到服务端的响应消息为 " + response.getId() +" " + response.getName());
} finally{
ReferenceCountUtil.release(msg);
}
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)throws Exception {
ctx.close();
}
}

原文地址:https://www.cnblogs.com/651434092qq/p/11765581.html