netty: 将传递数据格式转为String,并使用分隔符发送多条数据

自定义分割符,用:DelimiterBasedFrameDecoder类

ByteBuf转String,用StringDecoder类

参考代码:

//设置连接符/分隔符,换行显示
ByteBuf buf = Unpooled.copiedBuffer("$_".getBytes());
//DelimiterBasedFrameDecoder:自定义分隔符
sc.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, buf));
				
//设置为字符串形式的解码:将传递的buf改为String
sc.pipeline().addLast(new  StringDecoder());
	
//处理数据			
sc.pipeline().addLast(new ClientHandler());

  

完整代码:

client代码

public static void main(String[] args) throws InterruptedException {
		
		EventLoopGroup worker = new NioEventLoopGroup();
		Bootstrap b = new Bootstrap();
		b.group(worker)
		.channel(NioSocketChannel.class)
		.handler(new ChannelInitializer<SocketChannel>() {

			@Override
			protected void initChannel(SocketChannel sc) throws Exception {
				// TODO Auto-generated method stub
				//设置连接符/分隔符,换行显示
				ByteBuf buf = Unpooled.copiedBuffer("$_".getBytes());
				//DelimiterBasedFrameDecoder:自定义分隔符
				sc.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, buf));
				
				//设置为字符串形式的解码:将传递的buf改为String
				sc.pipeline().addLast(new  StringDecoder());
				
				sc.pipeline().addLast(new ClientHandler());
			}
		});
		//连接端口
		ChannelFuture cf = b.connect("127.0.0.1", 8765).sync();
		cf.channel().writeAndFlush(Unpooled.copiedBuffer("aaa$_".getBytes()));
		cf.channel().writeAndFlush(Unpooled.copiedBuffer("bbbbb$_".getBytes()));
		cf.channel().writeAndFlush(Unpooled.copiedBuffer("cccccccc$_".getBytes()));
		
		cf.channel().closeFuture().sync();		
		worker.shutdownGracefully();
		
	}

  

clientHandler代码

public class ClientHandler extends ChannelHandlerAdapter {

	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
		// TODO Auto-generated method stub
		//super.channelRead(ctx, msg);
		try {
			//在传输的时候已经将ByteBuf转为string
			String str = (String)msg;			
			System.out.println("Client: " + str);
		} finally {
			// TODO: handle finally clause
			ReferenceCountUtil.release(msg);
		}
	}

	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
		// TODO Auto-generated method stub
		cause.printStackTrace();
		ctx.close();
	}

	
}

  

Server代码:

public static void main(String[] args) throws InterruptedException {
		
		//待client连接的线程
		EventLoopGroup boss = new NioEventLoopGroup();
		//处理事务的线程
		EventLoopGroup worker = new NioEventLoopGroup();
		//bootstarp辅助类,注册server服务
		ServerBootstrap b = new ServerBootstrap();
		b.group(boss, worker)
		.channel(NioServerSocketChannel.class)
		.option(ChannelOption.SO_BACKLOG, 1024)
		.option(ChannelOption.SO_SNDBUF, 32*1024)
		.option(ChannelOption.SO_RCVBUF, 32*1024)
		.childHandler(new ChannelInitializer<SocketChannel>() {

			@Override
			protected void initChannel(SocketChannel sc) throws Exception {
				// TODO Auto-generated method stub
				//设置连接符,换行显示
				ByteBuf buf = Unpooled.copiedBuffer("$_".getBytes());
				//DelimiterBasedFrameDecoder:自定义分隔符
				sc.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, buf));
				
				//将buf转string
				sc.pipeline().addLast(new StringDecoder());
				
				sc.pipeline().addLast(new ServerHandler());
			}
		});
		
		//指定监听接口
		ChannelFuture cf = b.bind(8765).sync();		
		cf.channel().closeFuture().sync();
		
		boss.shutdownGracefully();
		worker.shutdownGracefully();
	}

  

ServerHandler代码

public class ServerHandler extends ChannelHandlerAdapter{

	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
		// TODO Auto-generated method stub
		//super.channelRead(ctx, msg);
		//handler设置了buf转String
		String str = (String)msg;
		System.out.println("Serer:" + str);
		
		String response = "我是响应的数据$_";
		ctx.writeAndFlush(Unpooled.copiedBuffer(response.getBytes()));
	}

	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
		// TODO Auto-generated method stub
		//super.exceptionCaught(ctx, cause);
		cause.printStackTrace();
		ctx.close();
	}
}

  

原文地址:https://www.cnblogs.com/achengmu/p/10944694.html