编解码器

  编解码器的作用是将原始字节数据与自定义的消息对象进行互转。编码器负责处理“出站”数据。

  解码器

    解码器负责解码“入站”数据从一种格式到另一种格式,解码器处理入站数据是抽象ChannelInboundHandler的实现。

    解码器有三种类型:解码字节到消息;解码消息到消息以及解码消息到字节。

    ByteToMessageDecoder

      decode(ChannelHandlerContext, ByteBuf, List<Object>):这个方法需要自己实现的抽象方法,作用是将ByteBuf数据解码成其他形式的数据

      decodeLast(ChannelHandlerContext, ByteBuf, List<Object>)

    ReplayingDecoder

      ReplayingDecoder是ByteToMessageDecoder的一种特殊的抽象基类,读取缓冲区的数据之前需要检查缓冲区是否有足够的字节,使用ReplayingDecoder就无需自己检查,若ByteBuf中有足够的字节,则正常读取,否则就是停止解码。不是所有的操作都被ByteBuf支持,若有一个不支持的就会抛出DecoderException;ByteBuf.readableBytes()大部分不会返回期望值

  

    MessageToMessageDecoder

      decode(ChannelHandlerContext ctx, Integer msg, List<Object> out) throws Excetopn;

    

  编码器

    编码器有两种类型:消息对象编码成消息对象和消息对象编码成字节码

    Netty也提供了两个抽象类:MessageToByteEncoder和MessageToMessageEncoder。需要重写encode(ChannelHandlerContext ctx, Integer msg, ByteBuf out) throws Exception;

  

  byte-to-byte编解码器

    Netty提供了ByteArrayEncoder和ByteArrayDecoder两个类。

public class ByteArrayDecoder extends MessageToMessageDecoder<ByteBuf>{
  protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception{
    byte[] array = new byte[msg.readableBytes()];
    msg.getBytes(0, array);
   
    out.add(array);
  }
}

@Sharable
public class ByteArrayEncoder extends MessageToMessageEncoder<byte[]>{
  protected void encode(ChannelHandlerContext ctx, byte[] msg, List<Object> out) throws Exception{
    out.add(Unpooled.wrappedBuffer(msg));
  }
}

  ByteToMessageCodec

    ByteToMessageCodec用来处理byte-to-message和message-to-byte。ByteToMessageCodec是一种组合,等同于ByteToMessageDecoder和MessageToByteEncoder的组合。MessageToByteEncoder有两个抽象方法:

      encode(ChannelHandlerContext, I, ByteBuf) //编码

      decode(ChannelHandlerContext, ByteBuf,  List<Object>) //解码

  MessageToMessageCodec

    MessageToMessageCodec用于message-to-message的编码和解码。可以看成是MessageToMessageDecoder和MessageToMessageEncoder的组合体。MessageToMessageCodec有两个抽象方法:

      encode(ChannelHandlerContext, OUTBOUNG_IN, List<Object>)

      decode(ChannelHandlerContext, INBOUND_IN, List<Object>)

  CombinedChannelDuplexHandler

    自定义编码器和解码器

public class CharCodec extends CombinedChannelDuplexHandler<Decoder, Encoder>{
  public CharCodec(){
    super(new Decoder(), new Encoder());
  }
}

  使用SSL/TLS创建安全的netty程序

    java提供了SslContext和SslEngine支持SSL/TLS。Netty提供了SslHandler,它扩展了Java的SslEngine。

public class SslChannelInitializer extends ChannelInitializer<Channel>{
  private final SSLContext context;
  private final boolean client;
  private final boolean startTls;

  public SslChannelInitializer(SSLContext context, boolean client, boolean startTls) {
    this.context = context;
    this.client = client;
    this.startTls = startTls;
  }

  protected void initChannel(Channel ch) throws Exception{
    SSLEngine engine = context.createSSLEngine();
    engine.setUserClientMode(client);
    ch.pipeline().addFirst("ssl", new SslHander(engine, startTls));
  }
}

  SslHandler必须要添加到ChannelPipeline的第一个位置。

    SSL/TLS的方法:

      setHandshakeTimeout(long handshakeTimeout, TimeUnit unit):设置握手超时时间,ChannelFuture将得到通知

      setHandshakeTimeoutMillis(long handshakeTimeoutMillis):设置握手超时时间,ChannelFuture将会得到通知

      getHandshakeTimeoutMillis():获取握手超时时间值

      setCloseNotifyTimeout(long closeNotifyTimeout, TimeUnit unit):设置关闭通知超时事件,若超时,ChannelFuture会关闭失败

      setHandshakeTimeoutMillis(long handshakeTimeoutMillis):设置关闭通知超时时间,若超时,ChannelFuture会关闭失败

      getCloseNotifyTimeoutMillis():获取关闭通知超时时间

      handshakeFuture():返回完成握手后的ChannelFuture

      close():发送关闭通知请求关闭和销毁

    

    一个HTTP请求/响应消息可能不止一个,但最终都会包含LastHttpContent消息。FullHttpRequest和FullHttpResponse是Netty提供的两个接口,分别用来完成http请求和响应。

    Netty提供了HTTP请求和响应的编码器和解码器:

      HttpRequestEncoder:将HttpRequest或HttpContent编码成ByteBuf

      HttpRequestDecoder:将ByteBuf解码成HttpRequest和HttpContent

      HttpResponseEncoder:将HttpResponse或HttpContent编码成ByteBuf

      HttpResponseDecoder:将ByteBuf解码成HttpResponse和HttpContent

    

    HTTP消息聚合

      处理HTTP时可能接受HTTP消息片段,Netty需要缓冲直到接受完整个消息。要处理HTTP消息,Netty提供了HttpObjectAggregator。通过HttpObjectAggregator,Netty可以聚合HTPP消息,使用FullHttpResponse和FullHttpRequest到ChannelPipeline中的下一个ChannelHandler,这样就消除了断裂消息,保证了消息的完整性。

    HTTP压缩

      Netty支持“gzip”和“deflate”。

protected void intiChannel(Channel ch) throws Exception{
  ChannelPipeline pipeline = ch.pipeline();
  if(client){
    pipeline.addLast("codec", new HttpClientCodec());
    pipeline.addLast("decompressor", new HttpContentDecompressor());
  }else{
    pipeline.addLast("codec", new HttpServerCodec());
    pipeline.addLast("decompressor", new HttpContentDecompressor());
  }
  pipeline.addLast("aggegator", new HttpObjectAggregator(512 * 1024));
}

  使用HTTPS

    

public class HttpsCodecInitializer extends ChannelInitializer<Channel>{
  private final SSLContext context;
  private final boolean client;

  public HttpsCodecInitializer(SSLContext context, boolean client){
    this.context = context;
    this.client = client;
  }

  protected void initChannel(Channel ch) throws Exception{
    SSLEngine engine = context.createSSLEngine();
    engine.setUseClientMode(client);
    ChannelPipeline pipeline = ch.pipeline();
    pipeline.addFirst("ssl", new SslHandler(engine));
    if(client)
      pipeline.addLast("codec", new HttpClientCodec());
   else
      pipeline.addLast("codec", new HttpServerCodec());
  }
}

  WebSocket

    Netty通过ChannelHandler对WebSocket进行支持。Netty支持如下WebSocket:

      BinaryWebSocketFrame:包含二进制数据

      TextWebSocketFrame:包含文本数据

      ContinuationWebSocketFrame:包含二进制数据或文本数据

      CloseWebSocketFrame:代表一个关闭请求,包含关闭状态码和短语

      PingWebSocketFrame:WebSocketFrame要求PongWebSocketFrame发送数据

      PongWebSocketFrame:WebSocketFrame要求PingWebSocketFrame响应

public class WebSocketServerInitializer extends ChannelInitializer<Channel>{
  protected void initChannel(Channel ch) throws Exception{
    ch.pipeline().addLast(new HttpServerCodec(), new HttpObjectAggregator(65536), new WebSocketServerProtocolHandler("/websocket"), new TextFrameHandler(), new BinaryFrameHandler(), new ContinuationFrameHandler());
  }

  public static final class TextFrameHanlder extends SimpleChannelInboundHandler<TextWebSocketFrame>{
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception{
    }
  }
 
  public static final class BinaryFrameHandler extends SimpleChannelInboundHandler<BinaryWebSocketFrame>{
    protected void channelRead0(ChannelHandlerContext ctx, BinaryWebSocketFrame msg) throws Exception{
    }
  }

  public static final class ContinuationFrameHandler extends SimpleChannelInboundHandler<ContinuationWebSocketFrame>{
   protected void channelRead0(ChannelHandlerContext ctx, ContinuationWebSocketFrame msg) throws Exception{
   }
  }
}

  SPDY

    SPDY是Google开发的基于TCP的应用层协议。SPDY的定位:将页面加载时间减少50%;最大限度减少部署的复杂性;避免网站开发者改动内容

    SPDY技术实现:单个TCP连接支持并发的HTTP请求;压缩包头和去掉不必要的头部来减少当前HTTP使用的带宽;定义一个易实现,在服务器端高效的协议。通过减少边缘情况,定义易解析的消息格式来减少HTTP的复杂性;强制使用SSL;允许服务器在需要时发起对客户端的连接并推送数据。

    

  Netty用三种不同的ChannelHanlder处理闲置和超时连接:

    IdeStateHandler:当一个通道没有进行读写或运行了一段时间后发出IdleStateEvent

    ReadTimeoutHandler:在指定时间内没有接收到任何数据将抛出ReadTimeoutException

    WriteTimeoutHandler:在指定时间内写入数据将抛出WriteTimeoutException

public class IdleStateHandlerInitializer extends ChannelInitialzier<Channel>{
  protected void initChannel(Channel ch) throws Exception{
    ChannelPipeline pipeline = ch.pipeline();
    pipeline.addLast(new IdleStateHandler(0, 0, 60, TimeUnit.SECONDS));

    pipeline.addLast(new HeartbeatHandler());
  }

  public static final class HeartbeatHandler extends ChannelInboundHandlerAdapter{
    private static final ByteBuf HEARTBEAT_SEQUEUE = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("HEARTBEAT", CharsetUtil.UTF_8));

    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception{
      if(evt instanceof IdleStateEvent)
        ctx.writeAndFlush(HEARTBEAT_SEQUEUE.duplicate()).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
      else
        super.userEventTriggered(ctx, evt);
    }
  }
}

  解码分隔符和基于长度的协议

    Netty提供两个类用于提取序列分割:

      DelimiterBasedFrameDecoder:接收ByteBuf由一个或多个分隔符拆分

      LineBasedFrameDecoder:接收ByteBuf以分隔线结束,如 ,

 

public class LineBaseHandlerInitializer extends ChannelInitializer<Channel>{
  protected void initChannel(Channel ch) throw Exception{
    ch.pipeline().addLast(new LineBasedFrameDecoder(65 * 1024), new FrameHandler());
  }

  public static final class FrameHandler extends SimpleChannelInboundHandler<ByteBuf>{
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception{
    }
  }
}

  Netty提供了两个以长度为单位的解码器:FixedLengthFrameDecoder和LengthFieldBasedFrameDecoder。

   

public class LengthBasedInitializer extends ChannelInitializer<Channel>{
  protected void intiChannel(Channel ch) throws Exception{
    ch.pipeline().addLast(new LengthFileBasedFrameDecoder(65 * 1024, 0, 8)).addLast(new FrameHandler());
  }

  public static final class FrameHandler extends SimpleChannelInboundHandler<ByteBuf>{
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception{
    }
  }
}

  写大数据

    Netty使用零拷贝写文件内容时通过DefaultFileRegion,ChannelHandlerContext和ChannelPipeline。

    

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception{
  File file = new File("test.txt");
  FileInputStream fis = new FileInputStream(file);
  FileRegion region = new DefaultFileRegion(fis.getChannel(), 0, file.length());
  Channel channel = ctx.channel();
  channel.writeAndFlush(region).addListener(new ChannelFutureListener(){
    protected void operationComplelte(ChannelFuture future) throws Exception{
      if(!future.isSuccess())
        Throwable cause = future.cause();
    }
  });
}

  Netty提供了ChunkedWriteHandler,允许通过处理ChunkedInput来写大的数据块。ChunkedFile,ChunkedNioFile,ChunkedStream和ChunkedNioStream实现了ChunkedInput。

public class ChunkedWriteHandlerInitializer extends ChannelInitializer<Channel>{
  private final File file;

  public ChunkedWriteHandlerInitializer(File file){
    this.file = file;
  }

  protected void initChannel(Channel ch) throws Exception{
    ch.pipeline().addLast(new ChunkedWriteHandler()).addLast(new WriteStreamHandler());
  }

  public final class WriteStreamHandler extends ChannelInboundHandlerAdapter{
    public void channelActive(ChannelHandlerContext ctx) throws Exception{
      super.channelActive(ctx);
      ctx.writeAndFlush(new ChunkedStream(new FileInputStream(file)));
    }
  }
}

  序列化 

    Java提供了ObjectInputStream和ObjectOutputStream等序列化接口。

    io.netty.handler.codec.serialization提供了如下接口:CompatibleObjectEncoder,CompactObjectInputStream,CompactObjectOutputStream,ObjectEncoder,ObjectDecoder,ObjectEncoderOutputStream和ObjectDecoderInputStream。

    JBoss Marshalling序列化的速度是JDK的3倍且序列化的结构更紧凑。

    io.netty.handler.codec.marshalling提供如下接口:CompatibleMarshallingEncoder,CompatibleMarshallingDecoder,MarshallingEncoder和MarshallingDecoder。

pubilc class MarshallingInitializer extends ChannelInitializer<Channel>{
  private final MarshallerProvider marshallerProvider;
  private final UnmarshallerProvider unmarshallerProvider;

  public MarshallingInitializer(MarshallerProvider marshallerProvider, UnmarshallerProvider unmarshallerProvider){
    this.marshallerProvider = marshallerProvider;
    this.unmarshallerProvider = unmarshallerProvider;
  }

  protected void initChannel(Channel ch) throws Exception{
    ch.pipeline().addLast(new MarshallingDecoder(unmarshallerProvider)).addLast(new MarshallingEncoder(marshallerProvider)).addLast(new ObjectHandler());
  }

  public final class ObjectHandler extends SimpleChannelInboundHander<Serializable>{
    protected void channelRead0(ChannelHandlerContext ctx, Serializable msg) throws Exception{
    }
  }
}

    ProtoBuf是Google开源的一种编码和解码技术,作用是使序列化数据更高效。

    io.netty.handler.codec.protobuf提供了:ProtoBufDecoder,ProtobufEncoder,ProtobufVarint32FrameDecoder和ProtibufVarint32LengthFieldPrepender

public class ProtoBufInitializer extends ChannelInitializer<Channel>{
  private final MessageLite lite;

  public ProtoBufInitializer(MessageLite lite){
    this.lite = lite;
  }

  protected void initChannel(Channel ch) throws Exception{
    ch.pipeline().addLast(new ProtobufVarint32FrameDecoder()).addLast(new ProtobufEncoder()).addLast(new ProtobufDecoder(lite)).addLast(new ObjectHandler());
  }

  public final class ObjectHandler extends SimpleChannelInboundHandler<Serializable>{
    protected void channelRead0(ChannelHandlerContext ctx, Serializable msg) throws Exception{
    }
  }
}

  

    

原文地址:https://www.cnblogs.com/forerver-elf/p/7213842.html