Netty通过心跳保持长链接

 Netty自带心跳检测功能,IdleStateHandler,客户端在写空闲时主动发起心跳请求,服务器接受到心跳请求后给出一个心跳响应。当客户端在一定时间范围内不能够给出响应则断开链接。

Java代码  收藏代码
  1. public class NettyClient {  
  2.     public void connect(String remoteServer, int port) throws Exception {  
  3.         EventLoopGroup workerGroup = new NioEventLoopGroup();  
  4.         try {  
  5.             Bootstrap b = new Bootstrap();  
  6.             b.group(workerGroup).channel(NioSocketChannel.class).remoteAddress(remoteServer, port)  
  7.                     .handler(new ChildChannelHandler());  
  8.   
  9.             ChannelFuture f = b.connect();  
  10.             System.out.println("Netty time Client connected at port " + port);  
  11.   
  12.             f.channel().closeFuture().sync();  
  13.         } finally {  
  14.             try {  
  15.                 TimeUnit.SECONDS.sleep(5);  
  16.                 try {  
  17.                     System.out.println("重新链接。。。");  
  18.                     connect(remoteServer, port);  
  19.                 } catch (Exception e) {  
  20.                     e.printStackTrace();  
  21.                 }  
  22.             } catch (Exception e) {  
  23.                 e.printStackTrace();  
  24.             }  
  25.         }  
  26.     }  
  27.   
  28.     public static class ChildChannelHandler extends ChannelInitializer<SocketChannel> {  
  29.   
  30.         @Override  
  31.         protected void initChannel(final SocketChannel ch) throws Exception {  
  32.             // -8表示lengthAdjustment,让解码器从0开始截取字节,并且包含消息头  
  33.             ch.pipeline().addLast(new RpcEncoder(NettyMessage.class)).addLast(new RpcDecoder(NettyMessage.class))  
  34.                     .addLast(new IdleStateHandler(120, 10, 0, TimeUnit.SECONDS)).addLast(new HeartBeatReqHandler());  
  35.         }  
  36.   
  37.     }  
  38.   
  39.     public static void main(String[] args) {  
  40.         try {  
  41.             new NettyClient().connect("127.0.0.1", 12000);  
  42.         } catch (Exception e) {  
  43.             e.printStackTrace();  
  44.         }  
  45.     }  
  46. }  
Java代码  收藏代码
  1. public class SerializationUtil {  
  2.   
  3.     private static Map<Class<?>, Schema<?>> cachedSchema = new ConcurrentHashMap<Class<?>, Schema<?>>();  
  4.   
  5.     private static Objenesis                objenesis    = new ObjenesisStd(true);  
  6.   
  7.     private static <T> Schema<T> getSchema(Class<T> clazz) {  
  8.         @SuppressWarnings("unchecked")  
  9.         Schema<T> schema = (Schema<T>) cachedSchema.get(clazz);  
  10.         if (schema == null) {  
  11.             schema = RuntimeSchema.getSchema(clazz);  
  12.             if (schema != null) {  
  13.                 cachedSchema.put(clazz, schema);  
  14.             }  
  15.         }  
  16.         return schema;  
  17.     }  
  18.   
  19.     /** 
  20.      * 序列化 
  21.      * 
  22.      * @param obj 
  23.      * @return 
  24.      */  
  25.     public static <T> byte[] serializer(T obj) {  
  26.         @SuppressWarnings("unchecked")  
  27.         Class<T> clazz = (Class<T>) obj.getClass();  
  28.         LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);  
  29.         try {  
  30.             Schema<T> schema = getSchema(clazz);  
  31.             byte result[] = ProtostuffIOUtil.toByteArray(obj, schema, buffer);  
  32.             return result;  
  33.         } catch (Exception e) {  
  34.             throw new IllegalStateException(e.getMessage(), e);  
  35.         } finally {  
  36.             buffer.clear();  
  37.         }  
  38.     }  
  39.   
  40.     /** 
  41.      * 反序列化 
  42.      * 
  43.      * @param data 
  44.      * @param clazz 
  45.      * @return 
  46.      */  
  47.     public static <T> T deserializer(byte[] data, Class<T> clazz) {  
  48.         try {  
  49.             T obj = objenesis.newInstance(clazz);  
  50.             Schema<T> schema = getSchema(clazz);  
  51.             ProtostuffIOUtil.mergeFrom(data, obj, schema);  
  52.             return obj;  
  53.         } catch (Exception e) {  
  54.             throw new IllegalStateException(e.getMessage(), e);  
  55.         }  
  56.     }  
  57. }  
Java代码  收藏代码
  1. @SuppressWarnings("rawtypes")  
  2. public class RpcEncoder extends MessageToByteEncoder {  
  3.   
  4.     private Class<?> genericClass;  
  5.   
  6.     public RpcEncoder(Class<?> genericClass) {  
  7.         this.genericClass = genericClass;  
  8.     }  
  9.   
  10.     @Override  
  11.     public void encode(ChannelHandlerContext ctx, Object in, ByteBuf out) throws Exception {  
  12.         if (genericClass.isInstance(in)) {  
  13.             System.out.println("发送的请求是:"+in);  
  14.             byte[] data = SerializationUtil.serializer(in);  
  15.             out.writeInt(data.length);  
  16.             out.writeBytes(data);  
  17.         }  
  18.     }  
  19. }  
Java代码  收藏代码
  1. public class RpcDecoder extends ByteToMessageDecoder {  
  2.   
  3.     private Class<?> genericClass;  
  4.   
  5.     public RpcDecoder(Class<?> genericClass) {  
  6.         this.genericClass = genericClass;  
  7.     }  
  8.   
  9.     @Override  
  10.     public final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)  
  11.             throws Exception {  
  12.         if (in.readableBytes() < 4) {  
  13.             return;  
  14.         }  
  15.         in.markReaderIndex();  
  16.         int dataLength = in.readInt();  
  17.         if (dataLength < 0) {  
  18.             ctx.close();  
  19.         }  
  20.         if (in.readableBytes() < dataLength) {  
  21.             in.resetReaderIndex();  
  22.         }  
  23.         byte[] data = new byte[dataLength];  
  24.         in.readBytes(data);  
  25.   
  26.         Object obj = SerializationUtil.deserializer(data, genericClass);  
  27.         System.out.println("接收到的消息是:"+obj);  
  28.         out.add(obj);  
  29.     }  
  30. }  
Java代码  收藏代码
  1. public class HeartBeatReqHandler extends ChannelDuplexHandler {  
  2.   
  3.     /** 
  4.      * @see io.netty.channel.ChannelInboundHandlerAdapter#userEventTriggered(io.netty.channel.ChannelHandlerContext, 
  5.      *      java.lang.Object) 
  6.      */  
  7.     @Override  
  8.     public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {  
  9.         if (IdleStateEvent.class.isAssignableFrom(evt.getClass())) {  
  10.             IdleStateEvent event = (IdleStateEvent) evt;  
  11.             if (event.state() == IdleState.READER_IDLE) {  
  12.                 System.out.println("read 空闲");  
  13.                 ctx.disconnect();  
  14.             } else if (event.state() == IdleState.WRITER_IDLE) {  
  15.                 System.out.println("write 空闲");  
  16.                 ctx.writeAndFlush(buildHeartBeat(MessageType.HEARTBEAT_REQ.getType()));  
  17.             }  
  18.         }  
  19.     }  
  20.   
  21.     /** 
  22.      *  
  23.      * @return 
  24.      * @author zhangwei<wei.zw@corp.netease.com> 
  25.      */  
  26.     private NettyMessage buildHeartBeat(byte type) {  
  27.         NettyMessage msg = new NettyMessage();  
  28.         Header header = new Header();  
  29.         header.setType(type);  
  30.         msg.setHeader(header);  
  31.         return msg;  
  32.     }  
  33.   
  34. }  
Java代码  收藏代码
  1. public class NettyServer {  
  2.     public void bind(int port) throws Exception {  
  3.         EventLoopGroup bossGroup = new NioEventLoopGroup();  
  4.         EventLoopGroup workerGroup = new NioEventLoopGroup();  
  5.         try {  
  6.             ServerBootstrap b = new ServerBootstrap();  
  7.             b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 1024)  
  8.                     .childHandler(new ChildChannelHandler());  
  9.   
  10.             ChannelFuture f = b.bind(port).sync();  
  11.             System.out.println("Netty time Server started at port " + port);  
  12.             f.channel().closeFuture().sync();  
  13.         } finally {  
  14.             bossGroup.shutdownGracefully();  
  15.             workerGroup.shutdownGracefully();  
  16.         }  
  17.     }  
  18.   
  19.     public static class ChildChannelHandler extends ChannelInitializer<SocketChannel> {  
  20.   
  21.         @Override  
  22.         protected void initChannel(final SocketChannel ch) throws Exception {  
  23.             ch.pipeline().addLast(new RpcDecoder(NettyMessage.class)).addLast(new RpcEncoder(NettyMessage.class))  
  24.                     .addLast(new IdleStateHandler(120, 0, 0, TimeUnit.SECONDS)).addLast(new HeartBeatRespHandler());  
  25.         }  
  26.   
  27.     }  
  28.   
  29.     public static void main(String[] args) {  
  30.         try {  
  31.             new NettyServer().bind(12000);  
  32.         } catch (Exception e) {  
  33.             e.printStackTrace();  
  34.         }  
  35.     }  
  36. }  
Java代码  收藏代码
  1. public enum MessageType {  
  2.   
  3.     LOGIN_REQ((byte) 1), LOGIN_RESP((byte) 2), HEARTBEAT_REQ((byte) 3), HEARTBEAT_RESP((byte) 4);  
  4.     private byte type;  
  5.   
  6.     /** 
  7.      * @param type 
  8.      */  
  9.     private MessageType(byte type) {  
  10.         this.type = type;  
  11.     }  
  12.   
  13.     public byte getType() {  
  14.         return type;  
  15.     }  
  16.   
  17.     public void setType(byte type) {  
  18.         this.type = type;  
  19.     }  
  20.   
  21.     public static MessageType getMessageType(byte type) {  
  22.         for (MessageType b : MessageType.values()) {  
  23.             if (b.getType() == type) {  
  24.                 return b;  
  25.             }  
  26.         }  
  27.         return null;  
  28.     }  
  29.   
  30. }  
Java代码  收藏代码
  1. public class HeartBeatRespHandler extends SimpleChannelInboundHandler<NettyMessage> {  
  2.   
  3.     /** 
  4.      * @see io.netty.channel.SimpleChannelInboundHandler#channelRead0(io.netty.channel.ChannelHandlerContext, 
  5.      *      java.lang.Object) 
  6.      */  
  7.     @Override  
  8.     protected void channelRead0(ChannelHandlerContext ctx, NettyMessage msg) throws Exception {  
  9.         if (msg.getHeader() != null && msg.getHeader().getType() == MessageType.HEARTBEAT_REQ.getType()) {  
  10.             NettyMessage heartBeat = buildHeartBeat(MessageType.HEARTBEAT_RESP.getType());  
  11.             ctx.writeAndFlush(heartBeat);  
  12.         } else {  
  13.             ctx.fireChannelRead(msg);  
  14.         }  
  15.     }  
  16.       
  17.   
  18.     /** 
  19.      * @see io.netty.channel.ChannelInboundHandlerAdapter#userEventTriggered(io.netty.channel.ChannelHandlerContext, 
  20.      *      java.lang.Object) 
  21.      */  
  22.     @Override  
  23.     public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {  
  24.         if (IdleStateEvent.class.isAssignableFrom(evt.getClass())) {  
  25.             IdleStateEvent event = (IdleStateEvent) evt;  
  26.             if (event.state() == IdleState.READER_IDLE) {  
  27.                 System.out.println("read 空闲 关闭链接");  
  28.                 ctx.disconnect();     
  29.             }   
  30.         }  
  31.     }  
  32.       
  33.   
  34.     /** 
  35.      *  
  36.      * @return 
  37.      * @author zhangwei<wei.zw@corp.netease.com> 
  38.      */  
  39.     private NettyMessage buildHeartBeat(byte type) {  
  40.         NettyMessage msg = new NettyMessage();  
  41.         Header header = new Header();  
  42.         header.setType(type);  
  43.         msg.setHeader(header);  
  44.         return msg;  
  45.     }  
  46.   
  47. }  
Java代码  收藏代码
  1. public class NettyMessage implements Serializable{  
  2.       
  3.     /**  */  
  4.     private static final long serialVersionUID = 1L;  
  5.   
  6.     private Header header;  
  7.       
  8.     private Object body;  
  9.   
  10.     public Header getHeader() {  
  11.         return header;  
  12.     }  
  13.   
  14.     public void setHeader(Header header) {  
  15.         this.header = header;  
  16.     }  
  17.   
  18.     public Object getBody() {  
  19.         return body;  
  20.     }  
  21.   
  22.     public void setBody(Object body) {  
  23.         this.body = body;  
  24.     }  
  25.   
  26.     /**  
  27.      * @see java.lang.Object#toString() 
  28.      */  
  29.     @Override  
  30.     public String toString() {  
  31.         return "NettyMessage [header=" + header + ", body=" + body + "]";  
  32.     }  
  33.       
  34.       
  35. }  
Java代码  收藏代码
  1. public class Header implements Serializable{  
  2.     /**  */  
  3.     private static final long serialVersionUID = 1L;  
  4.     private int crcCode=0xabef0101;  
  5.     private int length;  
  6.     private long sessionId;  
  7.     private byte type;  
  8.     private byte priority;  
  9.     private Map<String,Object> attachment=new HashMap<>();  
  10.     public int getCrcCode() {  
  11.         return crcCode;  
  12.     }  
  13.     public void setCrcCode(int crcCode) {  
  14.         this.crcCode = crcCode;  
  15.     }  
  16.     public int getLength() {  
  17.         return length;  
  18.     }  
  19.     public void setLength(int length) {  
  20.         this.length = length;  
  21.     }  
  22.     public long getSessionId() {  
  23.         return sessionId;  
  24.     }  
  25.     public void setSessionId(long sessionId) {  
  26.         this.sessionId = sessionId;  
  27.     }  
  28.     public byte getType() {  
  29.         return type;  
  30.     }  
  31.     public void setType(byte type) {  
  32.         this.type = type;  
  33.     }  
  34.     public byte getPriority() {  
  35.         return priority;  
  36.     }  
  37.     public void setPriority(byte priority) {  
  38.         this.priority = priority;  
  39.     }  
  40.     public Map<String, Object> getAttachment() {  
  41.         return attachment;  
  42.     }  
  43.     public void setAttachment(Map<String, Object> attachment) {  
  44.         this.attachment = attachment;  
  45.     }  
  46.     /**  
  47.      * @see java.lang.Object#toString() 
  48.      */  
  49.     @Override  
  50.     public String toString() {  
  51.         return "Header [crcCode=" + crcCode + ", length=" + length + ", sessionId=" + sessionId + ", type=" + type  
  52.                 + ", priority=" + priority + ", attachment=" + attachment + "]";  
  53.     }  
  54.       
  55.       
  56. }  

客户端的结果是:

Java代码  收藏代码
  1. etty time Client connected at port 12000  
  2. write 空闲  
  3. 发送的请求是:NettyMessage [header=Header [crcCode=-1410399999, length=0, sessionId=0, type=3, priority=0, attachment={}], body=null]  
  4. 接收到的消息是:NettyMessage [header=Header [crcCode=-1410399999, length=0, sessionId=0, type=4, priority=0, attachment={}], body=null]  
  5. write 空闲  
  6. 发送的请求是:NettyMessage [header=Header [crcCode=-1410399999, length=0, sessionId=0, type=3, priority=0, attachment={}], body=null]  
  7. 接收到的消息是:NettyMessage [header=Header [crcCode=-1410399999, length=0, sessionId=0, type=4, priority=0, attachment={}], body=null]  
  8. write 空闲  
  9. 发送的请求是:NettyMessage [header=Header [crcCode=-1410399999, length=0, sessionId=0, type=3, priority=0, attachment={}], body=null]  
  10. 接收到的消息是:NettyMessage [header=Header [crcCode=-1410399999, length=0, sessionId=0, type=4, priority=0, attachment={}], body=null]  
  11. write 空闲  
  12. 发送的请求是:NettyMessage [header=Header [crcCode=-1410399999, length=0, sessionId=0, type=3, priority=0, attachment={}], body=null]  
  13. 接收到的消息是:NettyMessage [header=Header [crcCode=-1410399999, length=0, sessionId=0, type=4, priority=0, attachment={}], body=null]  
  14. write 空闲  
  15. 发送的请求是:NettyMessage [header=Header [crcCode=-1410399999, length=0, sessionId=0, type=3, priority=0, attachment={}], body=null]  
  16. 接收到的消息是:NettyMessage [header=Header [crcCode=-1410399999, length=0, sessionId=0, type=4, priority=0, attachment={}], body=null]  
原文地址:https://www.cnblogs.com/austinspark-jessylu/p/7324909.html