解决netty tcp自定义消息格式粘包/拆包问题

项目地址:https://github.com/KouReal/netty-study-0816

参考:https://www.cnblogs.com/sidesky/p/6913109.html

使用到的开源库:Reflections,  根据注解找到一个包下的所有类Class

https://stackoverflow.com/questions/13128552/how-to-scan-classes-for-annotations

https://github.com/ronmamo/reflections

<dependency>
    <groupId>org.reflections</groupId>
    <artifactId>reflections</artifactId>
    <version>0.9.11</version>
</dependency>
public class ProtocolMap {
    private static Logger logger = LoggerFactory.getLogger(ProtocolMap.class);
    private static Map<Integer, String> pmap_name = new HashMap<>();
    private static Map<Integer,Class<?>> pmap = new HashMap<>();

    private static void setpmap() throws ProtocolException{
        Header header = new Header();
        Class<?> header_cls = header.getClass();
        Field[] header_fs = header_cls.getDeclaredFields();
        Map<String, Integer> namemap = new HashMap<>();
        for (Field f : header_fs) {
            f.setAccessible(true);
            try {
                namemap.put(f.getName(),(Integer)f.get(header));
                pmap_name.put((Integer)f.get(header), f.getName());
            } catch (IllegalArgumentException e) {
                e.printStackTrace();
            } catch (IllegalAccessException e) {
                e.printStackTrace();
            }
        }
        
        logger.info("setpmap:{}",namemap);
        
        Reflections reflections = new Reflections("protocol");
        Set<Class<?>> msg_cls_set = reflections.getTypesAnnotatedWith(MyMessage.class);
        logger.info("class sets:{}",msg_cls_set);
        for (Class<?> msg_cls : msg_cls_set) {
            MyMessage anno = msg_cls.getAnnotation(MyMessage.class);
            String name = anno.value();
            if(!namemap.containsKey(name)){
                throw new ProtocolException("协议class:"+msg_cls+"的MyMessage注解值:【"+name+"】对应的协议未定义");
            }
            Integer protocol_id = namemap.get(name);
            pmap.put(protocol_id, msg_cls);
        }
        for (Field f : header_fs) {
            f.setAccessible(true);
            try {
                Integer id = (Integer)f.get(header);
                if(!pmap.containsKey(id)){
                    pmap.put(id, null);
                }
            } catch (IllegalArgumentException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            } catch (IllegalAccessException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    }
    public static boolean checkprotocol(Integer id) throws ProtocolException{
        if(pmap_name.isEmpty()){
            setpmap();
        }
        return pmap_name.containsKey(id);
    }
    public static Class<?> getclass(Integer id) throws ProtocolException{
        if(pmap_name.isEmpty()){
            setpmap();
        }
        return pmap.get(id);
    }
}
public class LenPreMsgDecoder extends ByteToMessageDecoder{
    private static Logger logger = LoggerFactory.getLogger(LenPreMsgDecoder.class);
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        logger.info("decode: readinx:{},readable:{},writeidx:{}",in.readerIndex(),in.readableBytes(),in.writerIndex());
        if(in.readableBytes()<LenPreMsg.BASE_PRE_LEN){
            logger.info("readable too little");
            return ;
        }
        int beginindex;
        int protocol_id;
        while(true){
            beginindex = in.readerIndex();
            in.markReaderIndex();
            logger.info("before readInt:,readerindex:{},readable:{}",in.readerIndex(),in.readableBytes());
            protocol_id = in.readInt();
            logger.info("protocolid:{}",protocol_id);
            if(ProtocolMap.checkprotocol(protocol_id)){
                break;
            }
            in.resetReaderIndex();
            in.readByte();
            if(in.readableBytes()<LenPreMsg.BASE_PRE_LEN){
                return;
            }
        }
        
        int len = in.readInt();
        logger.info("read len:{}",len);
        
        if(len==0){
            //heartbeat
            out.add(new LenPreMsg(protocol_id, 0, null));
            return ;
        }
        logger.info("after read length,readablebytes:{}",in.readableBytes());
        if(in.readableBytes()<len){
            in.readerIndex(beginindex);
            return ;
        }
        byte[] data = new byte[len];
        in.readBytes(data);
        Class<?> protocol_cls = ProtocolMap.getclass(protocol_id);
        logger.info("start serialize,cls:{}",protocol_cls);
        Object obj = (Object) SerializeUtil.deserializeWithProtostuff(data, protocol_cls);
        logger.info("decoder get obj:{}",obj);
        out.add(new LenPreMsg(protocol_id, len, obj));
        
        
    }
    

}
 

@Test
    public void test1(){
        RegDiscover regDiscover = new RegDiscover("httpsdsfsafsaerver", "addsdfsafsr");
        RpcRequest rpcRequest = new RpcRequest("734209479jdfjsfdjsfip794079014",
                "adfjashfpaoh73497498d80fh32", 
                "asjdfaiohpgh", 
                "djas;fja938u2893", 
                "0f8y0409y3", 
                "7398ujfhhhe9hf93h2f9h");
        RegService regService = new RegService("customerfdsafsadf", "xxxdsfadsfasdf");
        List<LenPreMsg> msgs = asList(new LenPreMsg(Header.reg_discover, 1, regDiscover),
                new LenPreMsg(Header.rpc_request, 1, rpcRequest),
                new LenPreMsg(Header.reg_addservice, 1, regService)); 
//        List<Integer> protocols = asList(Header.reg_discover,Header.rpc_request,Header.reg_addservice);
        for(int i=0;i<4;++i){
            new Thread(new Runnable() {
                
                @Override
                public void run() {
                    for(int i=0;i<10;++i){
                        try {
                            cli.sendtoserver(msgs.get(i%3));
                        } catch (Exception e) {
                            // TODO Auto-generated catch block
                            e.printStackTrace();
                        }
                    }
                    
                }
            }).start();
        }
原文地址:https://www.cnblogs.com/CreatorKou/p/11365378.html