实时大规模数据的订阅和推送服务

1. 背景  

     服务后台实时收集千万级别在线终端、全国近400个城市的出租车、手机和pad等移动终端的位置点gps信息,然后根据gps所在城市区域,持久化并推送分发给不同的订阅用户。

     其业务逻辑图如下:

                               

 

 1.1 需求特征

  a 实时性(gps点本身具有实时性的特征,例如打车服务,需要周边实时出租车位置信息)

  b 数据量大(全国实时gps点数据规模 T级别/per day,高峰期时达到1G/min) 

 1.2 推送方式选择    

  数据推送方式通常有两种类型:  

  a Pull方式,这种方式服务端开发相对简单,可以采用缓存+httpserver的方式解决;

  b Push方式,这种方式通常满足实时性的需求,对服务端而言逻辑相对复杂,需要维持大并发的连接和发送

  由于实时性和大数据量需求的特征,所以系统采用"Push+长连接方式"进行推送。当然实现一套支持实时海量数据和客户推送的系统,需要解决的关键技术问题有很多: 如分布式集群,集群的failover和balancer能力,集群节点的配置管理等等。本系统借鉴hadoop的RPC模块,实现一套订阅发布实时推送服务,下面主要说说为了提高单节点的并发和吞吐量的些trick. 

2. 架构图    

3. 性能优化

 3.1 异步数据发送

   异步发送逻辑如下:  

private int channelIO(WritableByteChannel writeCh, ByteBuffer buf)
            throws IOException {
        int originalLimit = buf.limit();
        int initialRemaining = buf.remaining();
        int ret = 0;
        while (buf.remaining() > 0) {
            try {
                int ioSize = Math.min(buf.remaining(), NIO_BUFFER_LIMIT);
                buf.limit(buf.position() + ioSize);
                ret = writeCh.write(buf);
                if (ret < ioSize) {
                    break;
                }
            } finally {
                buf.limit(originalLimit);
            }
        }
        int nBytes = initialRemaining - buf.remaining(); 
        return (nBytes > 0) ? nBytes : ret;
    }

   这里主要有两优化点:①为防止待写的数据量过大导致独占线程时间片过长,在8行代码对ByteBuffer进行了分片发送(尽管tcp尽量避免数据分片和组包),②在11行代码,通道没法写完数据时,应让出线程,立刻返回注册到selector,待下次writeCh通道变成writable可写状态时,再进行channelIO写操作(这是niobio的最大区别)。 

 3.2 用多selector机制,分离网络读写操作

      ReadSelector负责监听用户的请求和鉴权响应,若用户请求为合法,则把相应连接注册给WriteSelector;WriteSelector负责将接收的实时gps点数据推送给已鉴权成功的注册用户连接         

 3.3 使用多selector机制,进行异步写数据

     可以根据客户端的端口hash到不同的Selector上去执行写的操作,如下: 

    private Responder selectResponder(int remotePort){
        int index = Math.abs(remotePort % responderCount);
        return responders[index];
    }

4. 容错健壮性

      最后还得考虑实时数据流大和频率高的特征,当存在网络不好或带宽不足时,服务会存在数据发送不赢而导致堆积的潜在风险。所以为每个连接增加个队列ResponseQueue,来维护待发送数据集。只有数据队列中存在数据时,就将相应连接注册到WriteSelector。如下图:

                                       

        这里主要用到两个trick:  

   4.1 避免服务数据堆积

     当网络状况不好对方接收较慢或发送数据量比较大时,这两种情况下,都会造成服务数据堆积。因此,引入参数连接的缓冲数据队列大小限制maxAllowedQueueSize如果数据批次队列大于maxAllowedQueueSize,则直接丢弃,避免数据无上限增长,如下代码: 

void doRespond(Call call) throws IOException {
        try {
                    synchronized (call.connection.responseQueue) {
                        if (call.connection.responseQueue.size() < maxAllowedQueueSize) {
                            call.connection.responseQueue.addLast(call);
                            if (call.connection.responseQueue.size() == 1) {
                                processResponse(call.connection.responseQueue, true);
                            }
                        } else {
                            logger.warn(
                                    "incoming data discarded from connection {}",
                                    call.connection);
                        }
                    }
                } catch (NullPointerException e) {
                    logger.error(e.getMessage(), e);
                }
            }

   4.2 定期扫描和关闭坏掉的连接资源

     这里的坏掉是指数据在一段时间内一直停留在连接connection的数据队列里,则认为该连接已失效而直接清理队列数据和关闭相应连接。代码如下: 

            private void doPurge(Call call, long now) throws IOException {
              if(call.connection == null || call.connection.responseQueue == null){
                  return ;
              }
              LinkedList<Call> responseQueue = call.connection.responseQueue;
              synchronized (responseQueue) {
                Iterator<Call> iter = responseQueue.listIterator(0);
                while (iter.hasNext()) {
                  call = iter.next();
                  if (now > call.timestamp + PURGE_INTERVAL) {
                    logger.info("dalay of current connection {}  exceeds 10 mins",call.connection);
                    closeConnection(call.connection);
                    
                  }
                }
              }
            }

     希望对有类似需求的网友能提供些参考和讨论。

原文地址:https://www.cnblogs.com/gisorange/p/3581493.html