netty: marshalling传递对象,传输附件GzipUtils

netty: marshalling传递对象,传输附件GzipUtils

  前端与服务端传输文件时,需要双方需要进行解压缩,也就是Java序列化。可以使用java进行对象序列化,netty去传输,但java序列化硬伤太多(无法跨语言,码流太大,性能太低),所以最好使用主流的编辑码框架来配合netty使用。此处使用的是JBossMarshalling框架。
用到的包:

<!-- https://mvnrepository.com/artifact/io.netty/netty-all -->
	<dependency>
	    <groupId>io.netty</groupId>
	    <artifactId>netty-all</artifactId>
	    <version>5.0.0.Alpha2</version>
	</dependency>
	
	<!-- https://mvnrepository.com/artifact/org.jboss.marshalling/jboss-marshalling -->
	<dependency>
	    <groupId>org.jboss.marshalling</groupId>
	    <artifactId>jboss-marshalling</artifactId>
	    <version>2.0.0.CR1</version>
	</dependency>
    
    <!-- https://mvnrepository.com/artifact/org.jboss.marshalling/jboss-marshalling-serial -->
	<dependency>
	    <groupId>org.jboss.marshalling</groupId>
	    <artifactId>jboss-marshalling-serial</artifactId>
	    <version>2.0.0.CR1</version>	    
	</dependency>

  

用到的压缩包工具类:

gziputils.java

public class GzipUtils {

	 public static byte[] gzip(byte[] data) throws Exception{
	        ByteArrayOutputStream bos = new ByteArrayOutputStream();
	        GZIPOutputStream gzip = new GZIPOutputStream(bos);
	        gzip.write(data);
	        gzip.finish();
	        gzip.close();
	        byte[] ret = bos.toByteArray();
	        bos.close();
	        return ret;
	    }
	    
	    public static byte[] ungzip(byte[] data) throws Exception{
	        ByteArrayInputStream bis = new ByteArrayInputStream(data);
	        GZIPInputStream gzip = new GZIPInputStream(bis);
	        byte[] buf = new byte[1024];
	        int num = -1;
	        ByteArrayOutputStream bos = new ByteArrayOutputStream();
	        while((num = gzip.read(buf, 0 , buf.length)) != -1 ){
	            bos.write(buf, 0, num);
	        }
	        gzip.close();
	        bis.close();
	        byte[] ret = bos.toByteArray();
	        bos.flush();
	        bos.close();
	        return ret;
	    }
	    
	    public static void main(String[] args) throws Exception{
	        
	        //读取文件
	        String readPath = System.getProperty("user.dir") + File.separatorChar + "sources" +  File.separatorChar + "Netty+3.1中文用户手册.doc.jpg";
	        File file = new File(readPath);  
	        FileInputStream in = new FileInputStream(file);  
	        byte[] data = new byte[in.available()];  
	        in.read(data);  
	        in.close();  
	        
	        System.out.println("文件原始大小:" + data.length);
	        //测试压缩
	        
	        byte[] ret1 = GzipUtils.gzip(data);
	        System.out.println("压缩之后大小:" + ret1.length);
	        
	        byte[] ret2 = GzipUtils.ungzip(ret1);
	        System.out.println("还原之后大小:" + ret2.length);
	        
	        //写出文件
	        String writePath = System.getProperty("user.dir") + File.separatorChar + "receive" +  File.separatorChar + "Netty+3.1中文用户手册.doc.jpg";
	        FileOutputStream fos = new FileOutputStream(writePath);
	        fos.write(ret2);
	        fos.close();        
	        
	        
	    }
}

  

Request.java类

public class Request implements Serializable {

	/**
	 * 
	 */
	private static final long serialVersionUID = 1L;
	
	private String id;
	private String name;
	private String requestMessage;
	private byte[] attachment;
	
	public String getId() {
		return id;
	}
	
	public void setId(String id) {
		this.id = id;
	}
	
	public String getName() {
		return name;
	}
	
	public void setName(String name) {
		this.name = name;
	}
	
	public String getRequestMessage() {
		return requestMessage;
	}
	
	public void setRequestMessage(String requestMessage) {
		this.requestMessage = requestMessage;
	}
	
	public byte[] getAttachment() {
		return attachment;
	}
	
	public void setAttachment(byte[] attachment) {
		this.attachment = attachment;
	}	

}

  

Response.java类

public class Response implements Serializable {

	/**
	 * 
	 */
	private static final long serialVersionUID = 1L;
	
	private String id;
	
	private String name;
	
	private String responseMessage;

	public String getId() {
		return id;
	}

	public void setId(String id) {
		this.id = id;
	}

	public String getName() {
		return name;
	}

	public void setName(String name) {
		this.name = name;
	}

	public String getResponseMessage() {
		return responseMessage;
	}

	public void setResponseMessage(String responseMessage) {
		this.responseMessage = responseMessage;
	}	

}

  

MarshallingCodeCFactory.java

序列号编码解码类

public final class MarshallingCodeCFactory {

	/**
	 * 解码器
	 * @return
	 */
	public static MarshallingDecoder buildMarshallingDecoder() {
		final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
		final MarshallingConfiguration configuration = new MarshallingConfiguration();
		configuration.setVersion(5);
		UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration);
		//构建MarshallingDecoder对象,两个参数分别为provider和消息序列化后的最大长度
		MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024*1024*1);
		return decoder;
		
	}
	
	/**
	 * 编码器
	 * @return
	 */
	public static MarshallingEncoder buildMarshallingEncoder() {
		final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
		final MarshallingConfiguration configuration = new MarshallingConfiguration();
		configuration.setVersion(5);
		MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration);
		//构建MarshallingEncoder对象,参数为provider;
		MarshallingEncoder encoder = new MarshallingEncoder(provider);
		return encoder;
		
	}
}

  

开始开发client,server功能

server.java

public class Server {

	
	public static void main(String[] args) throws InterruptedException {
		EventLoopGroup boss = new NioEventLoopGroup();
		EventLoopGroup worker = new NioEventLoopGroup();
		ServerBootstrap b = new  ServerBootstrap();
		b.group(boss, worker)
		.channel(NioServerSocketChannel.class)
		.option(ChannelOption.SO_BACKLOG, 1024)
		.handler(new LoggingHandler(LogLevel.INFO))
		.childHandler(new ChannelInitializer<SocketChannel>() {

			@Override
			protected void initChannel(SocketChannel ch) throws Exception {
				// TODO Auto-generated method stub
				//设置编码解码
				ch.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
				ch.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
				ch.pipeline().addLast(new ServerHandler());
			}
		});
		ChannelFuture cf = b.bind(8765).sync();
		
		cf.channel().closeFuture().sync();
		boss.shutdownGracefully();
		worker.shutdownGracefully();
	}
}

  

serverHandler.java

需要继承ChannelHandlerAdapter类

public class ServerHandler extends ChannelHandlerAdapter {

	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
		// TODO Auto-generated method stub
		//super.exceptionCaught(ctx, cause);
		cause.printStackTrace();
		ctx.close();
	}

	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
		// TODO Auto-generated method stub
		//super.channelRead(ctx, msg);
		Request request = (Request) msg;
		System.out.println("Server: " + request.getId() + ","+request.getName()+","+request.getRequestMessage());
		
		//接收附件 写入文件
		byte[] attachment = GzipUtils.ungzip(request.getAttachment());		
		String path = System.getProperty("user.dir") + File.separatorChar + "receive" + File.separatorChar + request.getId() +".png";
		FileOutputStream outputStream = new FileOutputStream(path);
		outputStream.write(attachment);
		outputStream.close();
		
		//返回数据
		Response response = new Response();
		response.setId(request.getId());
		response.setName("response: " + request.getName());
		response.setResponseMessage("相应的内容: " + request.getRequestMessage());
		ctx.writeAndFlush(response);
		
	}

	
}

  

client.java类

public class Client {

	public static void main(String[] args) throws Exception {
	
		EventLoopGroup worker = new NioEventLoopGroup();
		Bootstrap b = new Bootstrap();
		b.group(worker)
		.channel(NioSocketChannel.class)
		.handler(new ChannelInitializer<SocketChannel>() {

			@Override
			protected void initChannel(SocketChannel ch) throws Exception {
				// TODO Auto-generated method stub
				//设置编码解码
				ch.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
				ch.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
				ch.pipeline().addLast(new ClientHandler());
			}
		});
		ChannelFuture cf = b.connect("127.0.0.1", 8765).sync();
		
		
		for(int i=0; i< 5; i++) {
			Request request = new  Request();
			request.setId(i  + "");
			request.setName( "pro"+ i);
			request.setRequestMessage("数据信息Client~Server:" + i);
			
			//发送附件
			String path = System.getProperty("user.dir")  + File.separatorChar + "resources" + File.separatorChar + "1.png";
			File file = new File(path);
			FileInputStream inputStream = new FileInputStream(file);
			byte[] data = new byte[inputStream.available()];
			inputStream.read(data);
			inputStream.close();
			request.setAttachment(GzipUtils.gzip(data));
			cf.channel().writeAndFlush(request);
		}
		
		System.out.println("user.dir: " + System.getProperty("user.dir")  + File.separatorChar + "resources" + File.separatorChar + "1.png" );
		
		cf.channel().closeFuture().sync();
		worker.shutdownGracefully();
	}
	
	
	
}

  

ClientHandler.java类

需要继承ChannelHandlerAdapter类

public class ClientHandler extends ChannelHandlerAdapter {

	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
		// TODO Auto-generated method stub
		//super.exceptionCaught(ctx, cause);
		cause.printStackTrace();
		ctx.close();
	}

	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
		// TODO Auto-generated method stub
		//super.channelRead(ctx, msg);
		try {
			Response response = (Response) msg;
			System.out.println("Client : " + response.getId() + ","+response.getName()+","+response.getResponseMessage());
		} finally {
			// TODO: handle finally clause
			ReferenceCountUtil.release(msg);
		}
		
		
	}

	
}

  

目录如下:

原文地址:https://www.cnblogs.com/achengmu/p/10974808.html