【Netty】利用Netty实现心跳检测和重连机制

一、前言

  心跳机制是定时发送一个自定义的结构体(心跳包),让对方知道自己还活着,以确保连接的有效性的机制。
  我们用到的很多框架都用到了心跳检测,比如服务注册到 Eureka Server 之后会维护一个心跳连接,告诉 Eureka Server 自己还活着。本文就是利用 Netty 来实现心跳检测,以及客户端重连。

二、设计思路

  1. 分为客户端和服务端
  2. 建立连接后,客户端先发送一个消息询问服务端是否可以进行通信了。
  3. 客户端收到服务端 Yes 的应答后,主动发送心跳消息,服务端接收到心跳消息后,返回心跳应答,周而复始。
  4. 心跳超时利用 Netty 的 ReadTimeOutHandler 机制,当一定周期内(默认值50s)没有读取到对方任何消息时,需要主动关闭链路。如果是客户端,重新发起连接。
  5. 为了避免出现粘/拆包问题,使用 DelimiterBasedFrameDecoderStringDecoder 来处理消息。

三、编码

  1. 先编写客户端 NettyClient
  1. public class NettyClient
  2.  
  3. private static final String HOST = "127.0.0.1"
  4.  
  5. private static final int PORT = 9911
  6.  
  7. private ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); 
  8.  
  9. EventLoopGroup group = new NioEventLoopGroup(); 
  10.  
  11.  
  12. private void connect(String host,int port)
  13. try
  14. Bootstrap b = new Bootstrap(); 
  15. b.group(group) 
  16. .channel(NioSocketChannel.class) 
  17. .option(ChannelOption.TCP_NODELAY,true
  18. .remoteAddress(new InetSocketAddress(host,port)) 
  19. .handler(new ChannelInitializer<SocketChannel>() { 
  20. @Override 
  21. protected void initChannel(SocketChannel ch) throws Exception
  22. ByteBuf delimiter = Unpooled.copiedBuffer("$_", CharsetUtil.UTF_8); 
  23. ch.pipeline() 
  24. .addLast(new DelimiterBasedFrameDecoder(1024,delimiter)) 
  25. .addLast(new StringDecoder()) 
  26. // 当一定周期内(默认50s)没有收到对方任何消息时,需要主动关闭链接 
  27. .addLast("readTimeOutHandler",new ReadTimeoutHandler(50)) 
  28. .addLast("heartBeatHandler",new HeartBeatReqHandler()); 
  29. }); 
  30. // 发起异步连接操作 
  31. ChannelFuture future = b.connect().sync(); 
  32. future.channel().closeFuture().sync(); 
  33. }catch (Exception e){ 
  34. e.printStackTrace(); 
  35. }finally
  36. // 所有资源释放完之后,清空资源,再次发起重连操作 
  37. executor.execute(()->{ 
  38. try
  39. TimeUnit.SECONDS.sleep(5); 
  40. //发起重连操作 
  41. connect(NettyClient.HOST,NettyClient.PORT); 
  42. } catch (InterruptedException e) { 
  43. e.printStackTrace(); 
  44. }); 
  45.  
  46. public static void main(String[] args)
  47. new NettyClient().connect(NettyClient.HOST,NettyClient.PORT); 
  48.  

这里稍微复杂点的就是38行开始的重连部分。
2. 心跳消息发送类 HeartBeatReqHandler

  1. package cn.sp.heartbeat; 
  2.  
  3. import io.netty.buffer.Unpooled; 
  4. import io.netty.channel.ChannelHandler; 
  5. import io.netty.channel.ChannelHandlerContext; 
  6. import io.netty.channel.SimpleChannelInboundHandler; 
  7.  
  8. import java.util.concurrent.ScheduledFuture; 
  9. import java.util.concurrent.TimeUnit; 
  10.  
  11. /** 
  12. * Created by 2YSP on 2019/5/23. 
  13. */ 
  14. @ChannelHandler.Sharable 
  15. public class HeartBeatReqHandler extends SimpleChannelInboundHandler<String>
  16.  
  17. private volatile ScheduledFuture<?> heartBeat; 
  18.  
  19. private static final String hello = "start notify with server$_"
  20.  
  21. @Override 
  22. public void channelActive(ChannelHandlerContext ctx) throws Exception
  23. ctx.writeAndFlush(Unpooled.copiedBuffer(hello.getBytes())); 
  24. System.out.println("================"); 
  25.  
  26. @Override 
  27. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception
  28. if (heartBeat != null){ 
  29. heartBeat.cancel(true); 
  30. heartBeat = null
  31. ctx.fireExceptionCaught(cause); 
  32.  
  33. @Override 
  34. protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception
  35. if ("ok".equalsIgnoreCase(msg)){ 
  36. //服务端返回ok开始心跳 
  37. heartBeat = ctx.executor().scheduleAtFixedRate(new HeartBeatTask(ctx),0,5000, TimeUnit.MILLISECONDS); 
  38. }else
  39. System.out.println("Client receive server heart beat message : --->"+msg); 
  40.  
  41. private class HeartBeatTask implements Runnable
  42.  
  43. private final ChannelHandlerContext ctx; 
  44.  
  45. public HeartBeatTask(ChannelHandlerContext ctx)
  46. this.ctx = ctx; 
  47.  
  48.  
  49. @Override 
  50. public void run()
  51. String heartBeat = "I am ok"
  52. System.out.println("Client send heart beat message to server: ----->"+heartBeat); 
  53. ctx.writeAndFlush(Unpooled.copiedBuffer((heartBeat+"$_").getBytes())); 
  54.  

channelActive()方法在首次建立连接后向服务端问好,如果服务端返回了 "ok" 就创建一个线程每隔5秒发送一次心跳消息。如果发生了异常,就取消定时任务并将其设置为 null,等待 GC 回收。
3. 服务端 NettyServer

  1. public class NettyServer
  2.  
  3. public static void main(String[] args)
  4. new NettyServer().bind(9911); 
  5.  
  6. private void bind(int port)
  7. EventLoopGroup group = new NioEventLoopGroup(); 
  8. try
  9. ServerBootstrap b = new ServerBootstrap(); 
  10. b.group(group) 
  11. .channel(NioServerSocketChannel.class) 
  12. .childHandler(new ChannelInitializer<SocketChannel>() { 
  13. @Override 
  14. protected void initChannel(SocketChannel ch) throws Exception
  15. ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes()); 
  16.  
  17. ch.pipeline() 
  18. .addLast(new DelimiterBasedFrameDecoder(1024,delimiter)) 
  19. .addLast(new StringDecoder()) 
  20. .addLast("readTimeOutHandler",new ReadTimeoutHandler(50)) 
  21. .addLast("HeartBeatHandler",new HeartBeatRespHandler()); 
  22. }); 
  23. // 绑定端口,同步等待成功 
  24. b.bind(port).sync(); 
  25. System.out.println("Netty Server start ok ...."); 
  26. }catch (Exception e){ 
  27. e.printStackTrace(); 
  1. 心跳响应类 HeartBeatRespHandler
  1. package cn.sp.heartbeat; 
  2.  
  3. import io.netty.buffer.Unpooled; 
  4. import io.netty.channel.ChannelHandler; 
  5. import io.netty.channel.ChannelHandlerContext; 
  6. import io.netty.channel.SimpleChannelInboundHandler; 
  7.  
  8. /** 
  9. * Created by 2YSP on 2019/5/23. 
  10. */ 
  11. @ChannelHandler.Sharable 
  12. public class HeartBeatRespHandler extends SimpleChannelInboundHandler<String>
  13.  
  14. private static final String resp = "I have received successfully$_"
  15.  
  16. @Override 
  17. protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception
  18. if (msg.equals("start notify with server")){ 
  19. ctx.writeAndFlush(Unpooled.copiedBuffer("ok$_".getBytes())); 
  20. }else
  21. //返回心跳应答信息 
  22. System.out.println("Receive client heart beat message: ---->"+ msg); 
  23. ctx.writeAndFlush(Unpooled.copiedBuffer(resp.getBytes())); 
  24.  
  25.  

第一次告诉客户端我已经准备好了,后面打印客户端发过来的信息并告诉客户端我已经收到你的消息了。

四、测试

启动服务端再启动客户端,可以看到心跳检测正常,如下图。

 

服务端控制台
服务端控制台

 

 

客户端控制台
客户端控制台

现在让服务端宕机一段时间,看客户端能否重连并开始正常工作。

 

关闭服务端后,客户端周期性的连接失败,控制台输出如图:

 

连接失败
连接失败

重新启动服务端,过一会儿发现重连成功了。

 

 

成功重连
成功重连

 

五、总结

总得来说,使用 Netty 实现心跳检测还是比较简单的,这里比较懒没有使用其他序列化协议(如 ProtoBuf 等),如果感兴趣的话大家可以自己试试。
代码地址,点击这里
有篇SpringBoot 整合长连接心跳机制的文章写的也很不错,地址https://crossoverjie.top/2018/05/24/netty/Netty(1)TCP-Heartbeat/

原文地址:https://www.cnblogs.com/2YSP/p/10917664.html