Mina传输大数组,多路解码,粘包问题的处理

我的实际情况:

   1,传递的业务数据种类很多,这就决定了我们要用多路解码器,MINA的中文手册提供的是DemuxingProtocolCodecFactory;

  2,,有的数据长度达到8K,网上有资料说Mina在传输数据超过2K的情况下,会分片传输,因此要考虑如何来接收;

  3,若数据发送很快,或者网络状况不佳,很容易出现粘包的情况,这也是要解决的问题。

1)针对多路解码:

编码器:

   将编码器继承MessageEncoder<T>,T是你编码的对象的类,此中我是要编码Requstwork类;其中GetBytes()是我自己定义的将对象的数据组成字节数组的函数;

public class RequstNetworkEncoder implements MessageEncoder<RequstNetwork>{
    @Override
    public void encode(IoSession ioSession, RequstNetwork requstNetwork, ProtocolEncoderOutput out)
            throws Exception {
        if (requstNetwork != null) {
            byte[] bytes1 = GetBytes(requstNetwork);
            int capacity = bytes1.length;
            IoBuffer buffer = IoBuffer.allocate(capacity, false);
            buffer.setAutoExpand(true);
            buffer.put(bytes1);           
            buffer.flip();
            out.write(buffer);
        }
    }
}

对应的解码器:

public class RequstNetworkDecoder implements MessageDecoder {
    @Override
    public MessageDecoderResult decodable(IoSession ioSession, IoBuffer ioBuffer) {
        if(ioBuffer.remaining()<2){
            //还没有达到不同数据的标志位的地方
            return MessageDecoderResult.NEED_DATA;
        }
        else{
            ioBuffer.position(1);
            byte b=ioBuffer.get();
            if (b==(此处为区分不同数据的标志)){  
                return  MessageDecoderResult.OK;

            }
            else{
                return MessageDecoderResult.NOT_OK;
            }
        }
    }

    @Override
    public MessageDecoderResult decode(IoSession ioSession, IoBuffer in, ProtocolDecoderOutput out)
            throws Exception {
        RequstNetworkReply reply=new RequstNetworkReply();
       //自己解码的过程
        out.write(reply);
        return MessageDecoderResult.OK;
    }

    @Override
    public void finishDecode(IoSession ioSession, ProtocolDecoderOutput protocolDecoderOutput) throws Exception {

    }
}

编解码工厂:

public class MyProtocolCodecFactory extends DemuxingProtocolCodecFactory {

    public MyProtocolCodecFactory(){
        super.addMessageEncoder(RequstNetwork.class,RequstNetworkEncoder.class);
        super.addMessageDecoder(RequstNetworkDecoder.class);

    }
}

针对大数组的传输和粘包,修改了一下网上的做法:

public class RequestPlanDecoder extends CumulativeProtocolDecoder {

    private final AttributeKey CONTEXT = new AttributeKey(getClass(),
            "context");

    @Override
    protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out)
            throws Exception {
      
        Context ctx =getContext(session);//获取session 的context

        long matchCount=ctx.getMatchLength();//目前已获取的数据
        long length=ctx.getLength();//数据总长度
        IoBuffer buffer=ctx.getBuffer();//数据存入buffer

        //第一次取数据
        if(length==0){
            length=in.getLong();
            //保存第一次获取的长度
            ctx.setLength(length);
            matchCount=in.remaining();
           ctx.setMatchLength(matchCount);
        }
        else{
            matchCount+=in.remaining();
            ctx.setMatchLength(matchCount);
        } 
        ctx.setMatchLength(matchCount); 
        if (in.hasRemaining()) {
           // 如果buff中还有数据 
           buffer.put(in);
           // 添加到保存数据的buffer中 
           if (matchCount >= length) { 
             ////自己解码的部分/////// 
              if(buffer.remaining() > 0) {
                 //如果读取一个完整包内容后还粘了包,就让父类再调用一次,进行下一次解析 
                 IoBuffer temp = IoBuffer.allocate(1024).setAutoExpand(true);
                 temp.put(buffer);
                 temp.flip();
                 in.sweep();
                 //清空数据 
                 in.put(temp);
              } 
             ctx.reset();//清空
             return true;  
           } else { 
             ctx.setBuffer(buffer); 
             return false; 
           } 
      } 
   return false; 
} 

//获取session的context 
public Context getContext(IoSession session) { 
    Context ctx = (Context) session.getAttribute(CONTEXT);
    if (ctx == null) { 
      ctx = new Context();
      session.setAttribute(CONTEXT, ctx);
    } 
    return ctx; 
} 
/** * 定义一个内部类,用来封转当前解码器中的一些公共数据,主要是用于大数据解析 **/
 private class Context {

   public IoBuffer buffer; 

   public long length = 0; 

   public long matchLength = 0; 

   public Context() { 
       buffer = IoBuffer.allocate(1024).setAutoExpand(true);
   }
   public void setBuffer(IoBuffer buffer) {
       this.buffer = buffer; 
   } 
   public void setLength(long length) {
       this.length = length; 
   }
   public void setMatchLength(long matchLength) { 
       this.matchLength = matchLength;
   } 
   public IoBuffer getBuffer() { 
       return buffer;
   }
   public long getLength() { 
       return length; 
   } 
   public long getMatchLength() {
       return matchLength;
    }
   public void reset(){ 
     this.buffer.clear();
     this.length=0; 
     this.matchLength=0; 
   }
 
  } 
} 

我想让传大数组的解码器能和其他解码器一起共用,通过查看官方的MINA API直到MessageDecoder就是继承了CumulativeProtocolDecoder,于是就做了如下结合:

public class RequestPlanDecode implements MessageDecoder  {
    private final AttributeKey CONTEXT = new AttributeKey(getClass(),
            "context");
    @Override
    public MessageDecoderResult decodable(IoSession ioSession, IoBuffer in) {
        if(in.remaining()<2){
            return MessageDecoderResult.NEED_DATA;
        }
        else{
            byte b1=in.get();
            byte b2=in.get();
            if(b2==<span style="font-family: Arial, Helvetica, sans-serif;">(此处为区分不同数据的标志)</span>){
                return MessageDecoderResult.OK;
            }
            else {
                return  MessageDecoderResult.NOT_OK;
            }
        }
    }

    @Override
    public MessageDecoderResult decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out)
            throws Exception {
        //=================结合CumulativeProtocolDecoder================//
        Context ctx =getContext(session);//获取session  的context

        long matchCount=ctx.getMatchLength();//目前已获取的数据
        long length=ctx.getLength();//数据总长度
        IoBuffer buffer=ctx.getBuffer();//数据存入buffer

        //第一次取数据
        if(length==0){
            length=in.getLong();
            //保存第一次获取的长度
            ctx.setLength(length);
            matchCount=in.remaining();
            ctx.setMatchLength(matchCount);
        }
        else{
            matchCount+=in.remaining();
            ctx.setMatchLength(matchCount);
        }
        if (in.hasRemaining()) {// 如果buff中还有数据
            buffer.put(in);// 添加到保存数据的buffer中
            if (matchCount >= length) {// 如果已经发送的数据的长度>=目标数据的长度,则进行解码
               ////自己解码的部分///////

                if(buffer.remaining() > 0) {////解决粘包
                    IoBuffer temp = IoBuffer.allocate(1024).setAutoExpand(true);
                    temp.put(buffer);
                    temp.flip();
                    in.sweep();
                    in.put(temp);
                }
                ctx.reset();
                return MessageDecoderResult.OK;

            } else {
                ctx.setBuffer(buffer);
                return MessageDecoderResult.NEED_DATA;
            }
        }
        return MessageDecoderResult.NEED_DATA;
    }

    @Override
    public void finishDecode(IoSession ioSession, ProtocolDecoderOutput protocolDecoderOutput)
            throws Exception {

    }
    /////////////////////////////////////结合CumulativeProtocolDecoder/////////////////////////////////////////////////
    //获取session的context
    public Context getContext(IoSession session) {
        Context ctx = (Context) session.getAttribute(CONTEXT);
        if (ctx == null) {
            ctx = new Context();
            session.setAttribute(CONTEXT, ctx);
        }
        return ctx;
    }
    /**
     * 定义一个内部类,用来封转当前解码器中的一些公共数据,主要是用于大数据解析
     *
     */
    private class Context {
        public IoBuffer buffer;
        public long length = 0;
        public long matchLength = 0;

        public Context() {
            buffer = IoBuffer.allocate(1024).setAutoExpand(true);
        }

        public void setBuffer(IoBuffer buffer) {
            this.buffer = buffer;
        }

        public void setLength(long length) {
            this.length = length;
        }
        public void setMatchLength(long matchLength) {
            this.matchLength = matchLength;
        }

        public IoBuffer getBuffer() {

            return buffer;
        }

        public long getLength() {
            return length;
        }

        public long getMatchLength() {
            return matchLength;
        }

        public  void reset(){
            this.buffer.clear();
            this.length=0;
            this.matchLength=0;
        }
    }

}
原文地址:https://www.cnblogs.com/bkyliufeng/p/6249317.html