Mina 系列(四)之KeepAliveFilter -- 心跳检测

Mina 系列(四)之KeepAliveFilter -- 心跳检测

摘要: 心跳协议,对基于CS模式的系统开发来说是一种比较常见与有效的连接检测方式,最近在用MINA框架,原本自己写了一个心跳协议实现,后来突然发现MINA本身带有这样一个心跳实现,感于对框架的小小崇拜,在实践的同时研究了一下!

MINA 本身提供了一个过滤器类: org.apache.mina.filter.keepalive.KeepAliveFilter,该过滤器用于在 IO 空闲的时候发送并且反馈心跳包(keep-alive request/response)。

KeepAliveFilter

KeepAliveFilter 构造器

public KeepAliveFilter(KeepAliveMessageFactory messageFactory, IdleStatus interestedIdleStatus,
        KeepAliveRequestTimeoutHandler policy) {
    this(messageFactory, interestedIdleStatus, policy, 60, 30);
}
  • KeepAvlieMessageFactory:该实例引用用于判断接受与发送的包是否是心跳包,以及心跳请求包的实现

  • IdleStatus:该过滤器所关注的空闲状态,默认认为读取空闲。即当读取通道空闲的时候发送心跳包

  • KeepAliveRequestTimeoutHandler:心跳包请求后超时无反馈情况下的处理机制,默认为 CLOSE,即关闭连接

首先需要实现接口 KeepAliveMessageFactory。该接口中的抽象方法有:

Modifier and Type Method and Description
Object getRequest(IoSession session)
Returns a (new) keep-alive request message.
Object getResponse(IoSession session, Object request)
Returns a (new) response message for the specified keep-alive request.
boolean isRequest(IoSession session, Object message) Returns true if and only if the specified message is a keep-alive request message.
boolean isResponse(IoSession session, Object message)Returns true if and only if the specified message is a keep-alive response message;
一般来说心跳机制主要分为以下四类:
  1. active 活跃型(活跃型心跳机制):当读取通道空闲的时候发送心跳请求,一旦该心跳请求被发送,那么需要在 keepAliveRequestTimeout 时间内接收到心跳反馈,否则 KeepAliveRequestTimeoutHandler 将会被调用,当一个心跳请求包被接受到后,那么心跳反馈也会立即发出。

    KeepAliveMessageFactory 类的实现方法:

    • getRequest(IoSession session) 必须反馈 non-null
    • getResponse( IoSession session, Object request) 必须反馈 non-null
  2. semi-active 半活跃型(半活跃型心跳机制):当读取通道空闲的时候发送心跳请求,然而并不在乎心跳反馈有没有,当一个心跳请求包被接收到后,那么心跳反馈也会立即发出。

    KeepAliveMessageFactory 类的实现方法:

    • getRequest(IoSession session) 必须反馈 non-null
    • getResponse( IoSession session, Object request) 必须反馈 non-null

    心跳包请求超时后的处理机制设置为 KeepAliveRequestTimeoutHandler.NOOP(不做任何处理),KeepAliveRequestTimeoutHandler.LOG(只输出警告信息不做其他处理)

  3. passive 被动型(半活跃型心跳机制):当前 IO 不希望主动发送心跳请求,但是当接受到一个心跳请求后,那么该心跳反馈也会立即发出。

    KeepAliveMessageFactory 类的实现方法:

    • getRequest(IoSession session) 必须反馈 null
    • getResponse( IoSession session, Object request) 必须反馈 non-null
  4. deaf speaker 聋子型(聋子型心跳机制):当前IO会主动发送心跳请求,但是不想发送任何心跳反馈。

    KeepAliveMessageFactory 类的实现方法:

    • getRequest(IoSession session) 必须反馈 non-null
    • getResponse( IoSession session, Object request) 必须反馈 null

    心跳包请求超时后的处理机制设置为 KeepAliveRequestTimeoutHandler.DEAF_SPEAKER

  5. sient-listener 持续监听型(持续监听型心跳机制):既不想发送心跳请求也不想发送心跳反馈。

    KeepAliveMessageFactory 类的实现方法:

    • getRequest(IoSession session) 必须反馈 null
    • getResponse( IoSession session, Object request) 必须反馈 null
心跳包请求超时后的处理机制

接口 KeepAliveRequestTimeoutHandler,一般该处理主要是针对能够发送心跳请求的心跳机制。

  1. CLOSE: 关闭连接
  2. LOG:输出 警告信息
  3. NOOP:不做任何处理
  4. EXCEPTION:抛出异常
  5. DEAF_SPEAKER: 一个特殊的处理,停止当前过滤器对对心跳反馈监听,因此让过滤器丢失请求超时的侦测功能。(让其变成聋子)
  6. keepAliveRequestTimeout(KeepAliveFilter filter, IoSession session):自定义处理

KeepAliveFilter 配制

KeepAliveMessageFactoryImpl kamfi = new KeepAliveMessageFactoryImpl();
KeepAliveFilter kaf = new KeepAliveFilter(kamfi, IdleStatus.READER_IDLE, KeepAliveRequestTimeoutHandler.CLOSE);
// idle 事件回调
kaf.setForwardEvent(true);
// 心跳检测间隔时间
kaf.setRequestInterval(60);
// 心跳检测超时时间
kaf.setRequestTimeout(30);
  • setForwardEvent 使用了 KeepAliveFilter 之后,IoHandlerAdapter 中的 sessionIdle 方法默认是不会再被调用的! 所以必须加入这句话 sessionIdle 才会被调用

  • setRequestInterval 设置心跳包请求时间间隔,其实对于被动型的心跳机制来说,设置心跳包请求间隔貌似是没有用的,因为它是不会发送心跳包的,但是它会触发 sessionIdle 事件, 我们利用该方法,可以来判断客户端是否在该时间间隔内没有发心跳包,一旦 sessionIdle 方法被调用,则认为 客户端丢失连接并将其踢出。因此其中参数 heartPeriod 其实就是服务器对于客户端的 IDLE 监控时间。默认 60 s。

  • setRequestTimeout 超时时间,如果当前发出一个心跳请求后需要反馈。默认 30 s

下面对客户端与服务端和分别举个例子

服务器

以被动型心跳机制为例,服务器在接受到客户端连接以后被动接受心跳请求,当在规定时间内没有收到客户端心跳请求时 将客户端连接关闭。

import org.apache.mina.core.service.IoAcceptor;
import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.codec.textline.TextLineCodecFactory;
import org.apache.mina.filter.keepalive.KeepAliveFilter;
import org.apache.mina.filter.keepalive.KeepAliveMessageFactory;
import org.apache.mina.filter.logging.LoggingFilter;
import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.InetSocketAddress;

public class Server {

    private static final int PORT = 9123;
    /** 30秒后超时 */
    private static final int IDEL_TIMEOUT = 30;
    /** 15秒发送一次心跳包 */
    private static final int HEART_BEAT_RATE = 15;
    /** 心跳包内容 */
    private static final String HEART_BEAT_REQUEST = "0x11";
    private static final String HEART_BEAT_RESPONSE = "0x12";
    private static final Logger LOG = LoggerFactory.getLogger(Server.class);

    public static void main(String[] args) throws IOException {
        IoAcceptor acceptor = new NioSocketAcceptor();
        acceptor.getSessionConfig().setReadBufferSize(1024);
        acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, IDEL_TIMEOUT);

        acceptor.getFilterChain().addLast("logger", new LoggingFilter());
        acceptor.getFilterChain().addLast(
                "codec",
                new ProtocolCodecFilter(new TextLineCodecFactory()));

        KeepAliveMessageFactory heartBeatFactory = new KeepAliveMessageFactoryImpl();
        KeepAliveFilter heartBeat = new KeepAliveFilter(heartBeatFactory, IdleStatus.BOTH_IDLE);
        // 设置是否forward到下一个filter
        heartBeat.setForwardEvent(true);
        // 设置心跳频率
        heartBeat.setRequestInterval(HEART_BEAT_RATE);

        acceptor.getFilterChain().addLast("heartbeat", heartBeat);

        acceptor.setHandler(new IoHandlerAdapter());
        acceptor.bind(new InetSocketAddress(PORT));
        System.out.println("Server started on port: " + PORT);
    }

    /**
     * 被动型心跳机制,服务器在接受到客户端连接以后被动接受心跳请求,当在规定时间内没有收到客户端心跳请求时将客户端连接关闭
     * @ClassName KeepAliveMessageFactoryImpl
     * @Description 内部类,实现 KeepAliveMessageFactory(心跳工厂)
     */
    private static class KeepAliveMessageFactoryImpl implements KeepAliveMessageFactory {

        /* 判断是否心跳请求包,是的话返回true */
        @Override
        public boolean isRequest(IoSession session, Object message) {
            LOG.info("请求心跳包信息: " + message);
            return message.equals(HEART_BEAT_REQUEST);
        }

        /* 由于被动型心跳机制,没有请求当然也就不关注反馈,因此直接返回 false */
        @Override
        public boolean isResponse(IoSession session, Object message) {
            return false;
        }

        /* 被动型心跳机制无请求,因此直接返回 null */
        @Override
        public Object getRequest(IoSession session) {
            return null;
        }

        /* 根据心跳请求 request,反回一个心跳反馈消息 non-null  */
        @Override
        public Object getResponse(IoSession session, Object request) {
            LOG.info("响应预设信息: " + HEART_BEAT_RESPONSE);
            return HEART_BEAT_RESPONSE;
        }
    }
}

客户端

客户端会定时发送心跳请求(注意定时时间必须小于,服务器端的IDLE监控时间),同时需要监听心跳反馈,以此来判断是否与服务器丢失连接。对于服务器的心跳请求不给与反馈。

import org.apache.mina.core.service.IoAcceptor;
import org.apache.mina.core.service.IoConnector;
import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.codec.textline.TextLineCodecFactory;
import org.apache.mina.filter.keepalive.KeepAliveFilter;
import org.apache.mina.filter.keepalive.KeepAliveMessageFactory;
import org.apache.mina.filter.logging.LoggingFilter;
import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
import org.apache.mina.transport.socket.nio.NioSocketConnector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.InetSocketAddress;

public class Client {

    private static final int PORT = 9123;
    /** 30秒后超时 */
    private static final int IDEL_TIMEOUT = 30;
    /** 15秒发送一次心跳包 */
    private static final int HEART_BEAT_RATE = 15;
    /** 心跳包内容 */
    private static final String HEART_BEAT_REQUEST = "0x11";
    private static final String HEART_BEAT_RESPONSE = "0x12";
    private static final Logger LOG = LoggerFactory.getLogger(Client.class);

    public static void main(String[] args) throws IOException {
        IoConnector connector = new NioSocketConnector();
        connector.getSessionConfig().setReadBufferSize(1024);
        connector.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, IDEL_TIMEOUT);

        connector.getFilterChain().addLast("logger", new LoggingFilter());
        connector.getFilterChain().addLast(
                "codec",
                new ProtocolCodecFilter(new TextLineCodecFactory()));

        KeepAliveMessageFactory heartBeatFactory = new KeepAliveMessageFactoryImpl();
        KeepAliveFilter heartBeat = new KeepAliveFilter(heartBeatFactory, IdleStatus.BOTH_IDLE);
        // 设置是否forward到下一个filter
        heartBeat.setForwardEvent(true);
        // 设置心跳频率
        heartBeat.setRequestInterval(HEART_BEAT_RATE);

        connector.getFilterChain().addLast("heartbeat", heartBeat);

        connector.setHandler(new IoHandlerAdapter());
        connector.connect(new InetSocketAddress("127.0.0.1", PORT));
        System.out.println("Server started on port: " + PORT);
    }

    /**
     * 被动型心跳机制,服务器在接受到客户端连接以后被动接受心跳请求,当在规定时间内没有收到客户端心跳请求时将客户端连接关闭
     * @ClassName KeepAliveMessageFactoryImpl
     * @Description 内部类,实现KeepAliveMessageFactory(心跳工厂)
     * @author cruise
     *
     */
    private static class KeepAliveMessageFactoryImpl implements KeepAliveMessageFactory {

        /* 服务器不会给客户端发送请求包,因此不关注请求包,直接返回 false   */
        @Override
        public boolean isRequest(IoSession session, Object message) {
            return false;
        }

        /* 客户端关注请求反馈,因此判断 mesaage 是否是反馈包 */
        @Override
        public boolean isResponse(IoSession session, Object message) {
            LOG.info("响应预设信息: " + message);
            return message.equals(HEART_BEAT_RESPONSE);
        }

        /* 获取心跳请求包 non-null */
        @Override
        public Object getRequest(IoSession session) {
            LOG.info("请求预设信息: " + HEART_BEAT_REQUEST);
            return HEART_BEAT_REQUEST;
        }

        /* 服务器不会给客户端发送心跳请求,客户端当然也不用反馈,该方法返回 null  */
        @Override
        public Object getResponse(IoSession session, Object request) {
            return null;
        }
    }
}
原文地址:https://www.cnblogs.com/binarylei/p/8496466.html