netty实现长连接心跳检

主要逻辑

使用netty实现长连接,主要靠心跳来维持服务器端及客户端连接。

实现的逻辑主要是:

服务器端方面

1, 服务器在网络空闲操作一定时间后,服务端失败心跳计数器加1。

2, 如果收到客户端的ping心跳包,则清零失败心跳计数器,如果连续n次未收到客户端的ping心跳包,则关闭链路,释放资源,等待客户端重连。


客户端方面

1, 客户端网络空闲在一定时间内没有进行写操作时,则发送一个ping心跳包。

2, 如果服务器端未在发送下一个心跳包之前回复pong心跳应答包,则失败心跳计数器加1。

3, 如果客户端连续发送n(此处根据具体业务进行定义)次ping心跳包,服务器端均未回复pong心跳应答包,则客户端断开连接,间隔一定时间进行重连操作,直至连接服务器成功。

环境:netty5,tomcat7,jdk7,myeclipse

服务器端心跳处理类:

[java] view plain copy
 
  1. public class HeartBeatRespHandler extends ChannelInboundHandlerAdapter {   
  2.     private  final Logger log=Logger.getLogger(HeartBeatRespHandler.class);  
  3.        //线程安全心跳失败计数器  
  4.        private AtomicInteger unRecPingTimes = new AtomicInteger(1);  
  5.        @Override  
  6.        public void channelRead(ChannelHandlerContext ctx, Object msg)    
  7.                 throws Exception {    
  8.            NettyMessageProto message = (NettyMessageProto)msg;  
  9.            unRecPingTimes = new AtomicInteger(1);  
  10.            //接收客户端心跳信息  
  11.            if(message.getHeader() != null  && message.getHeader().getType() == Constants.MSGTYPE_HEARTBEAT_REQUEST){  
  12.                 //清零心跳失败计数器  
  13.                 log.info("server receive client"+ctx.channel().attr(SysConst.SERIALNO_KEY)+" ping msg :---->"+message);  
  14.                 //接收客户端心跳后,进行心跳响应  
  15.                 NettyMessageProto replyMsg = buildHeartBeat();  
  16.                 ctx.writeAndFlush(replyMsg);  
  17.             }else{  
  18.                 ctx.fireChannelRead(msg);  
  19.             }  
  20.         }  
  21.          
  22.          
  23.         /** 
  24.          * 事件触发器,该处用来处理客户端空闲超时,发送心跳维持连接。 
  25.          */  
  26.         @Override  
  27.         public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {    
  28.             if (evt instanceof IdleStateEvent) {    
  29.                 IdleStateEvent event = (IdleStateEvent) evt;    
  30.                 if (event.state() == IdleState.READER_IDLE) {    
  31.                     /*读超时*/    
  32.                     log.info("===服务器端===(READER_IDLE 读超时)");  
  33.                     unRecPingTimes.getAndIncrement();   
  34.                   //客户端未进行ping心跳发送的次数等于3,断开此连接  
  35.                     if(unRecPingTimes.intValue() == 3){    
  36.                           
  37.                           ctx.disconnect();  
  38.                           System.out.println("此客户端连接超时,服务器主动关闭此连接....");  
  39.                           log.info("此客户端连接超时,服务器主动关闭此连接....");  
  40.                     }   
  41.                 } else if (event.state() == IdleState.WRITER_IDLE) {    
  42.                     /*服务端写超时*/       
  43.                     log.info("===服务器端===(WRITER_IDLE 写超时)");  
  44.                       
  45.                 } else if (event.state() == IdleState.ALL_IDLE) {    
  46.                     /*总超时*/    
  47.                     log.info("===服务器端===(ALL_IDLE 总超时)");    
  48.                 }    
  49.             }    
  50.         }  
  51.           
  52.          
  53.        /** 
  54.         * 创建心跳响应消息 
  55.         * @return 
  56.         */  
  57.        private NettyMessageProto buildHeartBeat(){  
  58.            HeaderProto header = HeaderProto.newBuilder().setType(Constants.MSGTYPE_HEARTBEAT_RESPONSE).build();  
  59.            NettyMessageProto message =NettyMessageProto.newBuilder().setHeader(header).build();  
  60.            return message;  
  61.        }  

客户端心跳处理类:
[java] view plain copy
 
  1. public class HeartBeatReqHandler extends ChannelHandlerAdapter {  
  2.     private  final Logger log=Logger.getLogger(HeartBeatReqHandler.class);  
  3.       
  4.     //线程安全心跳失败计数器  
  5.     private AtomicInteger unRecPongTimes = new AtomicInteger(1);  
  6.       
  7.     public void channelRead(ChannelHandlerContext ctx, Object msg)    
  8.             throws Exception {    
  9.         NettyMessageProto message = (NettyMessageProto)msg;    
  10.         //服务器端心跳回复  
  11.         if(message.getHeader() != null  && message.getHeader().getType() == Constants.MSGTYPE_HEARTBEAT_RESPONSE){  
  12.             //如果服务器进行pong心跳回复,则清零失败心跳计数器  
  13.             unRecPongTimes = new AtomicInteger(1);  
  14.             log.debug("client receive server pong msg :---->"+message);  
  15.         }else{  
  16.             ctx.fireChannelRead(msg);  
  17.         }  
  18.     }    
  19.       
  20.     /** 
  21.      * 事件触发器,该处用来处理客户端空闲超时,发送心跳维持连接。 
  22.      */  
  23.     public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {    
  24.         if (evt instanceof IdleStateEvent) {    
  25.             IdleStateEvent event = (IdleStateEvent) evt;    
  26.             if (event.state() == IdleState.READER_IDLE) {    
  27.                 /*读超时*/    
  28.                 log.info("===客户端===(READER_IDLE 读超时)");  
  29.             } else if (event.state() == IdleState.WRITER_IDLE) {    
  30.                 /*客户端写超时*/       
  31.                 log.info("===客户端===(WRITER_IDLE 写超时)");  
  32.                 unRecPongTimes.getAndIncrement();    
  33.                 //服务端未进行pong心跳响应的次数小于3,则进行发送心跳,否则则断开连接。  
  34.                 if(unRecPongTimes.intValue() < 3){    
  35.                     //发送心跳,维持连接  
  36.                     ctx.channel().writeAndFlush(buildHeartBeat()) ;   
  37.                     log.info("客户端:发送心跳");  
  38.                 }else{    
  39.                     ctx.channel().close();    
  40.                 }    
  41.             } else if (event.state() == IdleState.ALL_IDLE) {    
  42.                 /*总超时*/    
  43.                 log.info("===客户端===(ALL_IDLE 总超时)");    
  44.             }    
  45.         }    
  46.     }  
  47.           
  48.     private NettyMessageProto buildHeartBeat(){  
  49.         HeaderProto header = HeaderProto.newBuilder().setType(Constants.MSGTYPE_HEARTBEAT_REQUEST).build();  
  50.         NettyMessageProto  message = NettyMessageProto.newBuilder().setHeader(header).build();  
  51.         return message;  
  52.     }  
  53.       
  54.     /** 
  55.      * 异常处理 
  56.      */  
  57.     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)throws Exception{  
  58.         ctx.fireExceptionCaught(cause);  
  59.     }  
  60.   
  61. }  


[java] view plain copy
 
在CODE上查看代码片派生到我的代码片
  1. <pre code_snippet_id="2489110" snippet_file_name="blog_20170719_2_6056366" name="code" class="java"><pre code_snippet_id="2489110" snippet_file_name="blog_20170719_2_6056366"></pre><pre></pre><pre></pre><pre></pre><pre></pre><pre></pre><pre></pre><pre></pre><pre></pre><pre></pre><pre></pre><pre></pre><pre></pre><pre></pre></pre>  
  2. <pre></pre>  
  3. <pre></pre>  
  4. <pre></pre>  
  5. <pre></pre>  
  6. <link rel="stylesheet" href="http://static.blog.csdn.net/public/res-min/markdown_views.css?v=1.0">  
  7.                         
 
 
原文地址:https://www.cnblogs.com/austinspark-jessylu/p/7324982.html