Flink 源码(十九):组件通信(二)源码解读

4 RPC

RPC(本地/远程)调用,底层是通过 Akka 提供的 tell/ask 方法进行通信。
Flink 中 RPC 框架中涉及的主要类:
4.1 RpcGateway
  Flink 的 RPC 协议通过 RpcGateway 来定义,主要定义通信行为;用于远程调用RpcEndpoint 的某些方法,可以理解为对方的客服端代理。
  若想与远端 Actor 通信,则必须提供地址(ip 和 port),如在 Flink-on-Yarn 模式下,JobMaster 会先启动 ActorSystem,此时 TaskExecutor 的 Container 还未分配,后面与
TaskExecutor 通信时,必须让其提供对应地址。 
从类继承图可以看到基本上所有组件都实现了 RpcGateway 接口,其代码如下: 
4.2 RpcEndpoint
  RpcEndpoint 是通信终端,提供 RPC 服务组件的生命周期管理(start、stop)。每个RpcEndpoint对应了一个路径(endpointId和actorSystem共同确定),每个路径对应一个Actor,
其实现了 RpcGateway 接口,其构造函数如下: 
  构造的时候调用rpcService.startServer()启动RpcServer,进入可以接收处理请求的状态,最后将 RpcServer 绑定到主线程上真正执行起来。
在 RpcEndpoint 中还定义了一些方法如 runAsync(Runnable)、callAsync(Callable, Time)方
  法来执行 Rpc 调用,值得注意的是在 Flink 的设计中,对于同一个 Endpoint,所有的调用都运行在主线程,因此不会有并发问题,当启动 RpcEndpoint/进行 Rpc 调用时,其会委托
RcpServer 进行处理。 
 
4.3 RpcService 和 RpcServer
  RpcService 和 RpcServer 是 RpcEndPoint 的成员变量。
  1)RpcService 是 Rpc 服务的接口,其主要作用如下:
  ⚫ 根据提供的 RpcEndpoint 来启动和停止 RpcServer(Actor);
  ⚫ 根据提供的地址连接到(对方的)RpcServer,并返回一个 RpcGateway;
  ⚫ 延迟/立刻调度 Runnable、Callable;
  在 Flink 中实现类为 AkkaRpcService,是 Akka 的 ActorSystem 的封装,基本可以理解成 ActorSystem 的一个适配器。在 ClusterEntrypoint(JobMaster)和 TaskManagerRunner
(TaskExecutor)启动的过程中初始化并启动。
  AkkaRpcService 中封装了ActorSystem,并保存了ActorRef 到 RpcEndpoint的映射关系。RpcService 跟 RpcGateway 类似,也提供了获取地址和端口的方法。
  在构造 RpcEndpoint 时会启动指定 rpcEndpoint 上的 RpcServer,其会根据 RpcEndpoint类型(FencedRpcEndpoint 或其他)来创建不同的 AkkaRpcActor(FencedAkkaRpcActor 或
AkkaRpcActor),并将RpcEndpoint和AkkaRpcActor对应的ActorRef保存起来,AkkaRpcActor是底层 Akka 调用的实际接收者,RPC 的请求在客户端被封装成 RpcInvocation 对象,以 Akka
消息的形式发送。
  最终使用动态代理将所有的消息转发到 InvocationHandler,具体代码如下:
2)RpcServer 负责接收响应远端 RPC 消息请求,自身的代理对象。有两个实现:
⚫ AkkaInvocationHandler
⚫ FencedAkkaInvocationHandler
RpcServer 的启动是通知底层的 AkkaRpcActor 切换为 START 状态,开始处理远程调用请求: 
4.4 AkkaRpcActor
AkkaRpcActor 是 Akka 的具体实现,主要负责处理如下类型消息:
1)本地 Rpc 调用 LocalRpcInvocation
  会指派给 RpcEndpoint 进行处理,如果有响应结果,则将响应结果返还给 Sender。
2)RunAsync & CallAsync
  这类消息带有可执行的代码,直接在 Actor 的线程中执行。
3)控制消息 ControlMessages
  用来控制 Actor 行为,START 启动,STOP 停止,停止后收到的消息会丢弃掉。 
4.5 RPC 交互过程
  RPC 通信过程分为请求和响应。 
4.5.1 RPC 请求发送
  在 RpcService 中调用 connect()方法与对端的 RpcEndpoint(RpcServer)建立连接,connect()方 法 根 据 给 的 地 址 返 回InvocationHandler(AkkaInvocationHandler或FencedAkkaInvocationHandler,也就是对方的代理)。前面分析到客户端提供代理对象,代理对象会调用 AkkaInvocationHandler 的 invoke 方
法并传入 RPC 调用的方法和参数信息,代码如下:
AkkaInvocationHandler.java
  代码中判断所属的类,如果是 RPC 方法,则调用 invokeRpc 方法。将方法调用封装为RPCInvocation 消息。如果是本地则生成 LocalRPCInvocation,本地消息不需要序列化,如果
是远程调用则创建 RemoteRPCInvocation。
  判断远程方法调用是否需要等待结果,如果无需等待(void),则使用向 Actor 发送 tell 类型的消息,如果需要返回结果,则向 Acrot 发送 ask 类型的消息,代码如下: 
AkkaInvocationHandler.java

 

 

4.5.2 RPC 请求响应
  RPC 消息通过 RpcEndpoint 所绑定的 Actor 的 ActorRef 发送的,AkkaRpcActor 是消息接收的入口,AkkaRpcActor 在 RpcEndpoint 中构造生成,负责将消息交给不同的方法进行处
理。
AkkaRpcActor.java 
接收的消息有 3 种:
1)握手消息
在客户端构造时会通过 ActorSelection 发送过来。收到消息后检查接口、版本是否匹配。
AkkaRpcActor.java

 

2)控制消息
  在 RpcEndpoint 调用 start 方法后,会向自身发送一条 Processing.START 消息来转换当前 Actor 的状态为 STARTED,STOP 也类似,并且只有在 Actor 状态为 STARTED 时才
会处理 RPC 请求。
AkkaRpcActor.java
3)RPC 消息
通过解析 RpcInvocation 获取方法名和参数类型,并从 RpcEndpoint 类中找到 Method 对象,通过反射调用该方法。如果有返回结果,会以 Akka 消息的形式发送回发送者。
AkkaRpcActor.java

 

 
 
原文地址:https://www.cnblogs.com/qiu-hua/p/14502963.html