Hbase源码分析:server端RPC

server端rpc包括master和RegionServer。接下来主要梳理一下,master和regionserver中有关rpc创建,启动以及处理的过程。

1,server rpc的初始化过程

首先看一下上篇rpc概述中有关hbase rpc端的总体流程图。

由于HMaster继承自HRegionServer,master和region server中有关rpc的成员变量主要在HRegionServer中,主要包括(rpcServices和rpcClient)。当前主要讨论rpcServices,有关RpcClient会另外单独讨论。

Master和Region Server启动过程中有关rpc初始化和启动过程中的步骤如下:

1,在HRegionServer构造函数中调用createRpcService生成RSRpcServices对象。如果是master启动,HRegionServer是Hmaster的父类,该函数也会调用。

1   protected RSRpcServices createRpcServices() throws IOException {
2     return new RSRpcServices(this);
3   }

2,在RSRpcServices的构造函数中,生成RpcServer对象。

1     rpcServer = new RpcServer(rs, name, getServices(),
2       bindAddress, // use final bindAddress for this server.
3       rs.conf,
4       rpcSchedulerFactory.create(rs.conf, this, rs));

在构造RpcServer对象的过程中,HMaster和HRegionServer分别实现了getService()函数以使HMaster和HRegionServer响应不同的rpc服务。

3,在RpcServer的构造函数中,分别生成Listener,responder以及scheduler等几个重要的对象

 1     // Start the listener here and let it bind to the port
 2     listener = new Listener(name);
 3     this.port = listener.getAddress().getPort();
 4 
 5     this.metrics = new MetricsHBaseServer(name, new MetricsHBaseServerWrapperImpl(this));
 6     this.tcpNoDelay = conf.getBoolean("hbase.ipc.server.tcpnodelay", true);
 7     this.tcpKeepAlive = conf.getBoolean("hbase.ipc.server.tcpkeepalive", true);
 8 
 9     this.warnDelayedCalls = conf.getInt(WARN_DELAYED_CALLS, DEFAULT_WARN_DELAYED_CALLS);
10     this.delayedCalls = new AtomicInteger(0);
11     this.ipcUtil = new IPCUtil(conf);
12 
13 
14     // Create the responder here
15     responder = new Responder();
16     this.authorize = conf.getBoolean(HADOOP_SECURITY_AUTHORIZATION, false);
17     this.userProvider = UserProvider.instantiate(conf);
18     this.isSecurityEnabled = userProvider.isHBaseSecurityEnabled();
19     if (isSecurityEnabled) {
20       HBaseSaslRpcServer.init(conf);
21     }
22     this.scheduler = scheduler;
23     this.scheduler.init(new RpcSchedulerContext(this));

4,在Listener的构造函数中,还包含了readThreads个reader用来读取请求。

 1 readers = new Reader[readThreads];
 2       readPool = Executors.newFixedThreadPool(readThreads,
 3         new ThreadFactoryBuilder().setNameFormat(
 4           "RpcServer.reader=%d,bindAddress=" + bindAddress.getHostName() +
 5           ",port=" + port).setDaemon(true).build());
 6       for (int i = 0; i < readThreads; ++i) {
 7         Reader reader = new Reader();
 8         readers[i] = reader;
 9         readPool.execute(reader);
10       }

在以上这些对象构造完成以后,在HRegionServer的构造函数中会调用rpcServices.start()---》rpcServer.start(). 在rpcServer start函数中会分别启动responder,listener以及scheduler。

 1   public synchronized void start() {
 2     if (started) return;
 3     authTokenSecretMgr = createSecretManager();
 4     if (authTokenSecretMgr != null) {
 5       setSecretManager(authTokenSecretMgr);
 6       authTokenSecretMgr.start();
 7     }
 8     this.authManager = new ServiceAuthorizationManager();
 9     HBasePolicyProvider.init(conf, authManager);
10     responder.start();
11     listener.start();
12     scheduler.start();
13     started = true;
14   }

2,server rpc的处理过程

rpcserver监控,读取,请求基于Reactor模式, 流程图如下(来自引用)。 

2.1 Listener

对于Listener,有一个acceptChannle的ServerSocketChannel,acceptChannle在selector注册了OP_ACCEPT事件,同时Listener中包含了readThreads的readers线程由线程池管理。Listener的主要处理流程在doRunLoop函数中:

 1 private synchronized void doRunLoop() {
 2         while (running) {
 3           try {
 4             readSelector.select();
 5             while (adding) {
 6               this.wait(1000);
 7             }
 8 
 9             Iterator<SelectionKey> iter = readSelector.selectedKeys().iterator();
10             while (iter.hasNext()) {
11               SelectionKey key = iter.next();
12               iter.remove();
13               if (key.isValid()) {
14                 if (key.isReadable()) {
15                   doRead(key);
16                 }
17               }
18             }
19           } catch (InterruptedException e) {
20             LOG.debug("Interrupted while sleeping");
21             return;
22           } catch (IOException ex) {
23             LOG.info(getName() + ": IOException in Reader", ex);
24           }
25         }
26       }

当没有请求的时候,线程阻塞在第4行的select处。当有请求来临时,在判断请求有效后,会读取该连接上的请求上的数据(具体逻辑在readAndProcess函数中)。读取数据以后,会处理数据,具体在processOneRpc函数中。

 1     private void processOneRpc(byte[] buf) throws IOException, InterruptedException {
 2       if (connectionHeaderRead) {
 3         processRequest(buf);
 4       } else {
 5         processConnectionHeader(buf);
 6         this.connectionHeaderRead = true;
 7         if (!authorizeConnection()) {
 8           // Throw FatalConnectionException wrapping ACE so client does right thing and closes
 9           // down the connection instead of trying to read non-existent retun.
10           throw new AccessDeniedException("Connection from " + this + " for service " +
11             connectionHeader.getServiceName() + " is unauthorized for user: " + user);
12         }
13       }
14     }

根据连接头是否已经读取,如果没有读取连接头信息,变通过ProcessConnectionHeader读取连接头信息。如果读取连接头信息以后,会解析请求,并且将请求构造成统一的结构CallRunner,最终这个CallRunnder会被添加到scheduler中任务队列中,根据不同的调度策略(FifoRpcScheduler和SimpleRpcScheduler)进行处理。ProcessRequest的核心代码如下:

1       Call call = new Call(id, this.service, md, header, param, cellScanner, this, responder,
2               totalRequestSize, traceInfo, RpcServer.getRemoteIp());
3       scheduler.dispatch(new CallRunner(RpcServer.this, call));

2.2 Scheduler

Scheduler 默认使用了SimpleRpcScheduler。SimpleRpcScheduler包含三个不同的RpcExecutor(callExecutor,priorityExecutor,replicationExecutor)。对于大部分基于用户表的请求都是通过callExecutor来执行,callExecutor从之前添加的请求任务队列中获取请求,并且将请求交流对应的handler进行处理。具体逻辑在RpcExecutor的consumerLoop中,如下:

 1  protected void consumerLoop(final BlockingQueue<CallRunner> myQueue) {
 2     boolean interrupted = false;
 3     double handlerFailureThreshhold =
 4         conf == null ? 1.0 : conf.getDouble(HConstants.REGION_SERVER_HANDLER_ABORT_ON_ERROR_PERCENT,
 5           HConstants.DEFAULT_REGION_SERVER_HANDLER_ABORT_ON_ERROR_PERCENT);
 6     try {
 7       while (running) {
 8         try {
 9           MonitoredRPCHandler status = RpcServer.getStatus();
10           CallRunner task = myQueue.take();
11           task.setStatus(status);
12           try {
13             activeHandlerCount.incrementAndGet();
14             task.run();
15           } 

由于myQueue是阻塞队列,如果没有请求,那么scheduler将阻塞在第10行take处。否则将执行CallRunner中的run函数。而紧接着会调用rpcServer中的call函数。

1         // make the call
2         resultPair = this.rpcServer.call(call.service, call.md, call.param, call.cellScanner,
3           call.timestamp, this.status);

而在rpcServer的call函数中,首先会根据请求调用本地的对应的实现函数,并且通过阻塞的方法调用,返回结果(result)。

1 PayloadCarryingRpcController controller = new PayloadCarryingRpcController(cellScanner);
2       Message result = service.callBlockingMethod(md, controller, param);
3 ...
4 return new Pair<Message, CellScanner>(result, controller.cellScanner());

并且在CallRunner的 run函数中,将结果通过调用setResponse函数生成返回结果,将结果通过调用sendResponseIfReady通过responder将结果返回给client端。

      // Set the response for undelayed calls and delayed calls with
      // undelayed responses.
      if (!call.isDelayed() || !call.isReturnValueDelayed()) {
        Message param = resultPair != null ? resultPair.getFirst() : null;
        CellScanner cells = resultPair != null ? resultPair.getSecond() : null;
        call.setResponse(param, cells, errorThrowable, error);
      }

2.3 responder

responder负责将结果写回给client端。responder的实现也是通过类似Listener的React模式。上面schedule调度执行完以后生成的结果,将通过doRespond函数加入到返回结果的相应队列里面。在这个函数里面,如果一次channlewrite能够完成操作,则直接完成该写结果请求。否则将该call的connection注册OP_WRITE到selector。

 1 void doRespond(Call call) throws IOException {
 2       boolean added = false;
 3 
 4       // If there is already a write in progress, we don't wait. This allows to free the handlers
 5       //  immediately for other tasks.
 6       if (call.connection.responseQueue.isEmpty() && call.connection.responseWriteLock.tryLock()) {
 7         try {
 8           if (call.connection.responseQueue.isEmpty()) {
 9             // If we're alone, we can try to do a direct call to the socket. It's
10             //  an optimisation to save on context switches and data transfer between cores..
11             if (processResponse(call)) {
12               return; // we're done.
13             }
14             // Too big to fit, putting ahead.
15             call.connection.responseQueue.addFirst(call);
16             added = true; // We will register to the selector later, outside of the lock.
17           }
18         } finally {
19           call.connection.responseWriteLock.unlock();
20         }
21       }
22 
23       if (!added) {
24         call.connection.responseQueue.addLast(call);
25       }
26       call.responder.registerForWrite(call.connection);
27 
28       // set the serve time when the response has to be sent later
29       call.timestamp = System.currentTimeMillis();
30     }
31   }

在registerForWrite中会唤醒writeSelect,使得一旦有该连接上的请求数据过来,那么responder将通过doAsSyncWrite--》ProcessAllResponse处理请求,此时便和Listener的处理类似了。

 1           registerWrites();
 2           int keyCt = writeSelector.select(purgeTimeout);
 3           if (keyCt == 0) {
 4             continue;
 5           }
 6 
 7           Set<SelectionKey> keys = writeSelector.selectedKeys();
 8           Iterator<SelectionKey> iter = keys.iterator();
 9           while (iter.hasNext()) {
10             SelectionKey key = iter.next();
11             iter.remove();
12             try {
13               if (key.isValid() && key.isWritable()) {
14                 doAsyncWrite(key);
15               }

3 小结

本文结合代码了解了rpcserver的listener,reader,scheduler以及responder处理rpc请求的过程。对server端处理rpc请求有了一个较为清晰的认识。接下来会对client端的rpc请求逻辑做一个梳理,加油!

原文地址:https://www.cnblogs.com/superhedantou/p/5840635.html