前面已经介绍了,系统初始化的时候,作为消费端初始化的过程,怎么给成员依赖注入相应的对象,怎么注册到注册中心,怎么订阅数据中心地址的变化。 那究竟给成员变量注入了什么对象呢? 其实注入的对象就是 PROXY_FACTORY.getProxy(invoker, ProtocolUtils.isGeneric(generic)) 这个。我们先看一下构建了什么对象。
PROXY_FACTORY.getProxy构建代理类
public abstract class AbstractProxyFactory implements ProxyFactory {
private static final Class<?>[] INTERNAL_INTERFACES = new Class<?>[]{
EchoService.class, Destroyable.class
};
@Override
public <T> T getProxy(Invoker<T> invoker, boolean generic) throws RpcException {
...
//获取所有的接口的信息
interfaces.add(invoker.getInterface());
//默认加上EchoService,Destroyable的接口
interfaces.addAll(Arrays.asList(INTERNAL_INTERFACES));
return getProxy(invoker, interfaces.toArray(new Class<?>[0]));
}
}
public class JavassistProxyFactory extends AbstractProxyFactory {
@Override
@SuppressWarnings("unchecked")
public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
//代理一个对象,代理的接口就是我们上面获取的接口,代理类就是InvokerInvocationHandler
return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
}
}
public class InvokerInvocationHandler implements InvocationHandler {
private static final Logger logger = LoggerFactory.getLogger(InvokerInvocationHandler.class);
private final Invoker<?> invoker;
private ConsumerModel consumerModel;
//保存this.invoker和consumerModel
public InvokerInvocationHandler(Invoker<?> handler) {
this.invoker = handler;
String serviceKey = invoker.getUrl().getServiceKey();
if (serviceKey != null) {
this.consumerModel = ApplicationModel.getConsumerModel(serviceKey);
}
}
}
AbstractProxyFactory.getProxy
:
获取所有的接口的信息并且加上默认的EchoService,Destroyable的接口得到一个集合。然后调用getProxy(invoker, interfaces.toArray(new Class<?>[0]));
getProxy(invoker, interfaces.toArray(new Class<?>[0]))
:
代理一个对象,代理的接口就是我们上面获取的接口,代理类就是InvokerInvocationHandler
InvokerInvocationHandler
构造方法:
保存this.invoker和consumerModel
方法调用的执行过程:
public class InvokerInvocationHandler implements InvocationHandler {
private static final Logger logger = LoggerFactory.getLogger(InvokerInvocationHandler.class);
private final Invoker<?> invoker;
private ConsumerModel consumerModel;
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
if (method.getDeclaringClass() == Object.class) {
return method.invoke(invoker, args);
}
String methodName = method.getName();
Class<?>[] parameterTypes = method.getParameterTypes();
if (parameterTypes.length == 0) {
if ("toString".equals(methodName)) {
return invoker.toString();
} else if ("$destroy".equals(methodName)) {
invoker.destroy();
return null;
} else if ("hashCode".equals(methodName)) {
return invoker.hashCode();
}
} else if (parameterTypes.length == 1 && "equals".equals(methodName)) {
return invoker.equals(args[0]);
}
RpcInvocation rpcInvocation = new RpcInvocation(method, invoker.getInterface().getName(), args);
String serviceKey = invoker.getUrl().getServiceKey();
rpcInvocation.setTargetServiceUniqueName(serviceKey);
if (consumerModel != null) {
rpcInvocation.put(Constants.CONSUMER_MODEL, consumerModel);
rpcInvocation.put(Constants.METHOD_MODEL, consumerModel.getMethodModel(method));
}
return invoker.invoke(rpcInvocation).recreate();
}
}
可以看到方法最后会调用invoker.invoke(rpcInvocation).recreate(),那么invoker是什么对象呢?下面我截一下出来:
其实这里invoker就是用了包装者模式。把实际的invoker进行一层层的包装。最后通过调用最顶层的invoker
一层层的剥开。
首先是第一层的MockClusterInvoker
public class MockClusterInvoker<T> implements ClusterInvoker<T> {
@Override
public Result invoke(Invocation invocation) throws RpcException {
Result result = null;
String value = getUrl().getMethodParameter(invocation.getMethodName(), "mock", "false").trim();
//判断要不要触发mock的处理,不用直接调用this.invoker.invoke
if (value.length() == 0 || "false".equalsIgnoreCase(value)) {
//no mock
result = this.invoker.invoke(invocation);
} else if (value.startsWith("force")) { //是否强制使用mock
if (logger.isWarnEnabled()) {
logger.warn("force-mock: " + invocation.getMethodName() + " force-mock enabled , url : " + getUrl());
}
//force:direct mock
result = doMockInvoke(invocation, null);
} else { //执行this.invoker.invoke(invocation);,如果报错就是使用doMockInvoke(invocation, rpcException);来做降级
//fail-mock
try {
result = this.invoker.invoke(invocation);
//fix:#4585
if(result.getException() != null && result.getException() instanceof RpcException){
RpcException rpcException= (RpcException)result.getException();
if(rpcException.isBiz()){
throw rpcException;
}else {
result = doMockInvoke(invocation, rpcException);
}
}
} catch (RpcException e) {
if (e.isBiz()) {
throw e;
}
if (logger.isWarnEnabled()) {
logger.warn("fail-mock: " + invocation.getMethodName() + " fail-mock enabled , url : " + getUrl(), e);
}
result = doMockInvoke(invocation, e);
}
}
return result;
}
}
mock在这里的功能,首先判断是否开启mock功能
-
不开启:直接调用result = this.invoker.invoke(invocation);
-
开启,判断是否强制使用mock功能
- 强制使用:
doMockInvoke(invocation, null)
这种场景中主要可以用来解决服务端还没开发好的时候直接使用本地数据进行测试 - 没有强制使用: 首先调用
this.invoker.invoke(invocation)
看下是否出现了异常,如果出现异常则使用配置好的Mock类来实现服务的降级
- 强制使用:
下一层的invoker是AbstractCluster$InterceptorInvokerNode,继续debug往下调用
AbstractCluster$InterceptorInvokerNode.invoker
public abstract class AbstractCluster implements Cluster {
protected class InterceptorInvokerNode<T> extends AbstractClusterInvoker<T> {
private ClusterInterceptor interceptor;
@Override
public Result invoke(Invocation invocation) throws RpcException {
Result asyncResult;
try {
interceptor.before(next, invocation);
//调用这个
asyncResult = interceptor.intercept(next, invocation);
} catch (Exception e) {
// onError callback
if (interceptor instanceof ClusterInterceptor.Listener) {
ClusterInterceptor.Listener listener = (ClusterInterceptor.Listener) interceptor;
listener.onError(e, clusterInvoker, invocation);
}
throw e;
} finally {
interceptor.after(next, invocation);
}
return asyncResult.whenCompleteWithContext((r, t) -> {
// onResponse callback
if (interceptor instanceof ClusterInterceptor.Listener) {
ClusterInterceptor.Listener listener = (ClusterInterceptor.Listener) interceptor;
if (t == null) {
listener.onMessage(r, clusterInvoker, invocation);
} else {
listener.onError(t, clusterInvoker, invocation);
}
}
});
}
}
}
下面这个位置会调用interceptor.intercept(next, invocation); 这里的interceptor
是ConsumerContextClusterInterceptor,又因为ConsumerContextClusterInterceptor是ClusterInterceptor的子类,所以调用ClusterInterceptor.intercept next对象是一个clusterInvoker
@SPI
public interface ClusterInterceptor {
void before(AbstractClusterInvoker<?> clusterInvoker, Invocation invocation);
void after(AbstractClusterInvoker<?> clusterInvoker, Invocation invocation);
default Result intercept(AbstractClusterInvoker<?> clusterInvoker, Invocation invocation) throws RpcException {
return clusterInvoker.invoke(invocation);
}
}
那做为参数出入进来的clusterInvoker对象是什么?
- 单注册中心:默认FailoverClusterInvoker。
- 多注册中心:默认是ZoneAwareClusterInvoker。这里就不做讨论了。他的流程可能也就是通过相应的负载均衡得到其中一个注册中心的信息,然后单注册中心后面的东西一样
单注册中心FailoverClusterInvoker.invoke:
public abstract class AbstractClusterInvoker<T> implements ClusterInvoker<T> {
public Result invoke(final Invocation invocation) throws RpcException {
this.checkWhetherDestroyed();
Map<String, Object> contextAttachments = RpcContext.getContext().getObjectAttachments();
if (contextAttachments != null && contextAttachments.size() != 0) {
((RpcInvocation)invocation).addObjectAttachments(contextAttachments);
}
//得到这个注册中心下的所有invoker
List<Invoker<T>> invokers = this.list(invocation);
//获取相应的负债均衡策略
LoadBalance loadbalance = this.initLoadBalance(invokers, invocation);
RpcUtils.attachInvocationIdIfAsync(this.getUrl(), invocation);
return this.doInvoke(invocation, invokers, loadbalance);
}
}
public class FailoverClusterInvoker<T> extends AbstractClusterInvoker<T> {
private static final Logger logger = LoggerFactory.getLogger(FailoverClusterInvoker.class);
public FailoverClusterInvoker(Directory<T> directory) {
super(directory);
}
public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
List<Invoker<T>> copyInvokers = invokers;
this.checkInvokers(invokers, invocation);
String methodName = RpcUtils.getMethodName(invocation);
//重试机制的次数
int len = this.getUrl().getMethodParameter(methodName, "retries", 2) + 1;
if (len <= 0) {
len = 1;
}
RpcException le = null;
List<Invoker<T>> invoked = new ArrayList(invokers.size());
Set<String> providers = new HashSet(len);
for(int i = 0; i < len; ++i) {
if (i > 0) {
this.checkWhetherDestroyed();
copyInvokers = this.list(invocation);
this.checkInvokers(copyInvokers, invocation);
}
//通过负载均衡得到一个invoker
Invoker<T> invoker = this.select(loadbalance, invocation, copyInvokers, invoked);
invoked.add(invoker);
RpcContext.getContext().setInvokers(invoked);
try {
//真正的invoker调用,之前分析的都是包装的
Result result = invoker.invoke(invocation);
Result var13 = result;
return var13;
} catch (RpcException var18) {
if (var18.isBiz()) {
throw var18;
}
le = var18;
} catch (Throwable var19) {
le = new RpcException(var19.getMessage(), var19);
} finally {
providers.add(invoker.getUrl().getAddress());
}
}
}
}
AbstractClusterInvoker.invoke:
this.initLoadBalance(invokers, invocation)
: 获取相应的负载均衡策略。- this.doInvoke(invocation, invokers, loadbalance): 实际上就是调用字类FailoverClusterInvoker.doInvoke
FailoverClusterInvoker.doInvoke做的事情
- 看下是否需要重试机制。this.getUrl().getMethodParameter(methodName, "retries", 2) 并得到重试次数的数量
- 通过负载均衡策略得到相应的invoker。
- 如果失败了,并且当前次数不超过重试次数的话,就进行重试机制。
InvokerWrapper.invoke(过滤器链路循环调用)
public class InvokerWrapper<T> implements Invoker<T> {
@Override
public Result invoke(Invocation invocation) throws RpcException {
return invoker.invoke(invocation);
}
}
public class ProtocolFilterWrapper implements Protocol {
private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
Invoker<T> last = invoker;
List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
if (!filters.isEmpty()) {
for (int i = filters.size() - 1; i >= 0; i--) {
final Filter filter = filters.get(i);
final Invoker<T> next = last;
last = new Invoker<T>() {
@Override
public Result invoke(Invocation invocation) throws RpcException {
Result asyncResult;
try {
asyncResult = filter.invoke(next, invocation);
} catch (Exception e) {
...
}
return asyncResult.whenCompleteWithContext((r, t) -> {
...
});
}
};
}
}
return last;
}
}
invoker对象的组成是: RegistryDirectory$InvokerDelegate() -> ProtocolFilterWrapper ->
ListenerInvokerWrapper。
其中,在ProtocolFilterWrapper的调用中,实际会调用一个匿名内部类的invoke方法,这里构建了一个filter进行逐项的过滤。上面代码就可以看到,直接从spi中获取相应的filter。filter集合如下:
注意:ProtocolFilterWrapper和ListenerInvokerWrapper都是Protocol的包装类根据spi的作用,就会自动包装相应的PROTOCL的。所以才会调用这两个的。
真正的Invoker调用 -- AsyncToSyncInvoker和DubboInvoker调用
最后调用AsyncToSyncInvoker.invoke
public class AsyncToSyncInvoker<T> implements Invoker<T> {
private Invoker<T> invoker;
public AsyncToSyncInvoker(Invoker<T> invoker) {
this.invoker = invoker;
}
@Override
public Class<T> getInterface() {
return invoker.getInterface();
}
@Override
public Result invoke(Invocation invocation) throws RpcException {
Result asyncResult = invoker.invoke(invocation);
try {
//如果是同步就使用asyncResult.get试图获取结果
if (InvokeMode.SYNC == ((RpcInvocation) invocation).getInvokeMode()) {
asyncResult.get(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
}
} catch (InterruptedException e) {
} catch (ExecutionException e) {
} catch (Throwable e) {
throw new RpcException(e.getMessage(), e);
}
return asyncResult;
}
public Invoker<T> getInvoker() {
return invoker;
}
}
public abstract class AbstractInvoker<T> implements Invoker<T> {
@Override
public Result invoke(Invocation inv) throws RpcException {
RpcInvocation invocation = (RpcInvocation) inv;
invocation.setInvoker(this);
if (CollectionUtils.isNotEmptyMap(attachment)) {
invocation.addObjectAttachmentsIfAbsent(attachment);
}
Map<String, Object> contextAttachments = RpcContext.getContext().getObjectAttachments();
if (CollectionUtils.isNotEmptyMap(contextAttachments)) {
invocation.addObjectAttachments(contextAttachments);
}
invocation.setInvokeMode(RpcUtils.getInvokeMode(url, invocation));
RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
AsyncRpcResult asyncResult;
try {
asyncResult = (AsyncRpcResult) doInvoke(invocation);
} catch (InvocationTargetException e) { // biz exception
Throwable te = e.getTargetException();
if (te == null) {
asyncResult = AsyncRpcResult.newDefaultAsyncResult(null, e, invocation);
} else {
if (te instanceof RpcException) {
((RpcException) te).setCode(RpcException.BIZ_EXCEPTION);
}
asyncResult = AsyncRpcResult.newDefaultAsyncResult(null, te, invocation);
}
} catch (RpcException e) {
if (e.isBiz()) {
asyncResult = AsyncRpcResult.newDefaultAsyncResult(null, e, invocation);
} else {
throw e;
}
} catch (Throwable e) {
asyncResult = AsyncRpcResult.newDefaultAsyncResult(null, e, invocation);
}
RpcContext.getContext().setFuture(new FutureAdapter(asyncResult.getResponseFuture()));
return asyncResult;
}
}
public class DubboInvoker<T> extends AbstractInvoker<T> {
@Override
protected Result doInvoke(final Invocation invocation) throws Throwable {
RpcInvocation inv = (RpcInvocation) invocation;
final String methodName = RpcUtils.getMethodName(invocation);
inv.setAttachment(PATH_KEY, getUrl().getPath());
inv.setAttachment(VERSION_KEY, version);
//获取客户端连接
ExchangeClient currentClient;
if (clients.length == 1) {
currentClient = clients[0];
} else {
//如果有多个连接就轮回返回
currentClient = clients[index.getAndIncrement() % clients.length];
}
try {
//是否单向的,默认为false。
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
int timeout = calculateTimeout(invocation, methodName);
if (isOneway) {
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
currentClient.send(inv, isSent);
return AsyncRpcResult.newDefaultAsyncResult(invocation);
} else {
//使用参数构建一个线程池
ExecutorService executor = getCallbackExecutor(getUrl(), inv);
//使用client发起请求
CompletableFuture<AppResponse> appResponseFuture =
currentClient.request(inv, timeout, executor).thenApply(obj -> (AppResponse) obj);
// save for 2.6.x compatibility, for example, TraceFilter in Zipkin uses com.alibaba.xxx.FutureAdapter
FutureContext.getContext().setCompatibleFuture(appResponseFuture);
AsyncRpcResult result = new AsyncRpcResult(appResponseFuture, inv);
result.setExecutor(executor);
return result;
}
} catch (Exception e) {
...
}
}
//获取线程池
protected ExecutorService getCallbackExecutor(URL url, Invocation inv) {
ExecutorService sharedExecutor = ExtensionLoader.getExtensionLoader(ExecutorRepository.class).getDefaultExtension().getExecutor(url);
if (InvokeMode.SYNC == RpcUtils.getInvokeMode(getUrl(), inv)) {
return new ThreadlessExecutor(sharedExecutor);
} else {
return sharedExecutor;
}
}
}
至于为什么会使用AsyncToSyncInvoker和DubboInvoker的话,引用的位置在:以下是上一篇文章的截图:
调用链路就是AsyncToSyncInvoker.invoke
->DubboInvoker.invoker -> DubboInvoker.doInvoke
,因为AbstractInvoker是DubboInvoker的父类,所以会调用AbstractInvoker.invoker。 最后调用DubboInvoker.doInvoke。
DubboInvoker.doInvoke:
- 获取之前连接好的客户端,如果有多个就轮询获取其中一个
getCallbackExecutor
: 根据配置参数构建线程池currentClient.request
: 发起服务
ReferenceCountExchangeClient.request 发送要调用的方法
currentClient还记得是一个什么对象吗?它实际是一个ReferenceCountExchangeClient(HeaderExchangeClient())
。
final class ReferenceCountExchangeClient implements ExchangeClient {
private ExchangeClient client;
public ReferenceCountExchangeClient(ExchangeClient client) {
this.client = client;
referenceCount.incrementAndGet();
this.url = client.getUrl();
}
@Override
public CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor) throws RemotingException {
return client.request(request, timeout, executor);
}
}
public class HeaderExchangeClient implements ExchangeClient {
private final Client client;
private final ExchangeChannel channel;
@Override
public CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor) throws RemotingException {
return channel.request(request, timeout, executor);
}
}
final class HeaderExchangeChannel implements ExchangeChannel {
@Override
public CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor) throws RemotingException {
if (closed) {
throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
}
// create request.
//构建一个request
Request req = new Request();
req.setVersion(Version.getProtocolVersion());
req.setTwoWay(true);
req.setData(request);
DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout, executor);
try {
//发送信息
channel.send(req);
} catch (RemotingException e) {
future.cancel();
throw e;
}
return future;
}
}
这里的代码走向其实很简单跟着debug走就可以了:
ReferenceCountExchangeClient.request -> HeaderExchangeClient.request ->HeaderExchangeChannel.request
HeaderExchangeChannel.request解析: 构建了Request 来给channel去发送,开下Request的data是什么?这个data是从上面一值传下来的。我往上面看发现其实是invocation。使用debug看下invocation的内容
可以看到其实就是一个RpcInvocation,RpcInvocation引用的位置其实是在:InvokerInvocationHandler
后面继续调用。channel就是AbstractPeer,最后通过AbstractPeer - >AbstractClient ->NettyChannel.send ,把数据包发送到服务端。
提供方接收数据
先回顾下我们提供方最后打开监听的位置。最后是在NettyServer.doOpen()进行打开端口监听
public class NettyServer extends AbstractServer implements RemotingServer {
@Override
protected void doOpen() throws Throwable {
bootstrap = new ServerBootstrap();
//netty一些操作,因为我没学习过netty就不讨论了
bossGroup = NettyEventLoopFactory.eventLoopGroup(1, "NettyServerBoss");
workerGroup = NettyEventLoopFactory.eventLoopGroup(
getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
"NettyServerWorker");
final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
channels = nettyServerHandler.getChannels();
bootstrap.group(bossGroup, workerGroup)
.channel(NettyEventLoopFactory.serverSocketChannelClass())
.option(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
.childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// FIXME: should we use getTimeout()?
int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
if (getUrl().getParameter(SSL_ENABLED_KEY, false)) {
ch.pipeline().addLast("negotiation",
SslHandlerInitializer.sslServerHandler(getUrl(), nettyServerHandler));
}
ch.pipeline()
.addLast("decoder", adapter.getDecoder())
.addLast("encoder", adapter.getEncoder())
.addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS))
.addLast("handler", nettyServerHandler);
}
});
// bind
ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
channelFuture.syncUninterruptibly();
channel = channelFuture.channel();
}
}
public class NettyServerHandler extends ChannelDuplexHandler {
public NettyServerHandler(URL url, ChannelHandler handler) {
this.url = url;
this.handler = handler;
}
}
从上面可以看到配置的消息处理的handler是nettyServerHand。所以接收数据之后,会调用nettyServerHand的channelRead方法。再看一下nettyServerHand的构造方法。会把存进来的url和handler保存起来。传进来的handler是NettyServer
nettyServerHand.channelRead 读取发送过来的信息
public class NettyServerHandler extends ChannelDuplexHandler {
private final ChannelHandler handler;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
handler.received(channel, msg);
}
}
public abstract class AbstractPeer implements Endpoint, ChannelHandler {
@Override
public void received(Channel ch, Object msg) throws RemotingException {
if (closed) {
return;
}
handler.received(ch, msg);
}
}
handler就是NettyServer,NettyServer是AbstractPeer的子类,所以会调用AbstractPeer.received。
AbstractPeer.received的工作就是先判断连接是否关闭。如果还是关闭的话就直接返回,否则,通过handler继续调用。通过debug查看一下AbstractPeer下的handler是什么。下面截图如下:
我们可以看到其实这里使用了装饰者模式,通过handler处理链进行层层调用。调用如下:MultiMessageHandler
-> HeartbeatHandler
-> AllChannelHandler
-> DecodeHandler
-> HeaderExchangeHandler
-> DubboProtocol$requestHandler(receive).
可以看到最后就是我们熟悉的DubboProtocol$requestHandler。解析一下各个handler的作用
MultiMessageHandler
:复合消息处理HeartbeatHandler
:心跳消息处理,接收心跳并发送心跳响应AllChannelHandler
:业务线程转化处理器,把接收到的消息封装成ChannelEventRunnable可执行任务给线程池处理DecodeHandler
: 业务解码处理器
HeaderExchangeHandler.received
public class HeaderExchangeHandler implements ChannelHandlerDelegate {
@Override
public void received(Channel channel, Object message) throws RemotingException {
final ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
if (message instanceof Request) {
// handle request.
Request request = (Request) message;
if (request.isEvent()) {
handlerEvent(channel, request);
} else {
//是否双向传输
if (request.isTwoWay()) {
handleRequest(exchangeChannel, request);
} else {
handler.received(exchangeChannel, request.getData());
}
}
} else if (message instanceof Response) {
handleResponse(channel, (Response) message);
} else if (message instanceof String) {
if (isClientSide(channel)) {
Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());
logger.error(e.getMessage(), e);
} else {
String echo = handler.telnet(channel, (String) message);
if (echo != null && echo.length() > 0) {
channel.send(echo);
}
}
} else {
handler.received(exchangeChannel, message);
}
}
void handleRequest(final ExchangeChannel channel, Request req) throws RemotingException {
Response res = new Response(req.getId(), req.getVersion());
//如果req是用问题的
if (req.isBroken()) {
Object data = req.getData();
String msg;
if (data == null) {
msg = null;
} else if (data instanceof Throwable) {
msg = StringUtils.toString((Throwable) data);
} else {
msg = data.toString();
}
res.setErrorMessage("Fail to decode request due to: " + msg);
res.setStatus(Response.BAD_REQUEST);
channel.send(res);
return;
}
// find handler by message class.
Object msg = req.getData();
try {
//这里的handler其实就是DubboProtocol中的匿名内部类,用requestHandler为成员变量引用的
CompletionStage<Object> future = handler.reply(channel, msg);
//当future完成时会把数据结果放到res中,然后通过channel.send(res)发送结果回去。
future.whenComplete((appResult, t) -> {
try {
if (t == null) {
res.setStatus(Response.OK);
res.setResult(appResult);
} else {
res.setStatus(Response.SERVICE_ERROR);
res.setErrorMessage(StringUtils.toString(t));
}
channel.send(res);
} catch (RemotingException e) {
logger.warn("Send result to consumer failed, channel is " + channel + ", msg is " + e);
}
});
} catch (Throwable e) {
res.setStatus(Response.SERVICE_ERROR);
res.setErrorMessage(StringUtils.toString(e));
channel.send(res);
}
}
}
HeaderExchangeHandler.received
:
判断一下接收到的对象是什么类型的。根据不同的类型进行不同的操作。这里我们传送的对象是Request
,所以这里只看Request
的逻辑。message instanceof Request
下面的逻辑是:
如果连接是双向连接的话调用:handleRequest(exchangeChannel, request) 。 单向连接调用:handler.received
。这里我们使用的双向连接所以看一下handleRequest方法做的是什么
HeaderExchangeHandler.handleRequest
:
- req.getData(): 从
Request
中获取data数据,在上面有介绍过这个data数据其实就是RpcInvocation - handler.reply(channel, msg): 这里的handler其实就是DubboProtocol中的匿名内部类
- future.whenComplete:当future完成时会把数据结果放到res中,然后通过channel.send(res)发送结果回去。
DubboProtocol.reply
public class DubboProtocol extends AbstractProtocol {
private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {
@Override
public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException {
//如果消息类型不是invocation,则抛出异常表示无法识别
if (!(message instanceof Invocation)) {
throw new RemotingException(channel, "Unsupported request: "
+ (message == null ? null : (message.getClass().getName() + ": " + message))
+ ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
}
Invocation inv = (Invocation) message;
//根据key从发布的服务列表中查找到指定的服务端invoke,
//这个就是之前在讲服务发布时,涉及到的invoke对象。
Invoker<?> invoker = getInvoker(channel, inv);
// need to consider backward-compatibility if it's a callback
//判断invoker.getUrl()是否有inv传过来要处理的方法名,没有就抛返回空
if (Boolean.TRUE.toString().equals(inv.getObjectAttachments().get("_isCallBackServiceInvoke"))) {
String methodsStr = invoker.getUrl().getParameters().get("methods");
boolean hasMethod = false;
if (methodsStr == null || !methodsStr.contains(",")) {
hasMethod = inv.getMethodName().equals(methodsStr);
} else {
String[] methods = methodsStr.split(",");
for (String method : methods) {
if (inv.getMethodName().equals(method)) {
hasMethod = true;
break;
}
}
}
if (!hasMethod) {
return null;
}
}
RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
//发起请求调用,此时得到的invoker对象
Result result = invoker.invoke(inv);
return result.thenApply(Function.identity());
}
}
DubboProtocol内部类.reply:
getInvoker(channel, inv)
根据key从发布的服务列表中查找到指定的服务端invoke,这个就是之前在讲服务发布时,涉及到的invoke对象。- 判断invoker.getUrl()是否有inv传过来要处理的方法名,没有就抛返回空
invoker.invoke:
发起请求调用,此时得到的invoker对象
invoker对象是什么呢?
这个invoker构建的位置在服务发布的那一块,代码如下(我就复制了比较重要的代码过来):
public class ServiceConfig<T> extends ServiceConfigBase<T> {
private static final ProxyFactory PROXY_FACTORY = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
.....
//ref其实就是实现类的对象。
Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
Exporter<?> exporter = PROTOCOL.export(wrapperInvoker);
exporters.add(exporter);
....
}
}
PROXY_FACTORY.getInvoker
就是构建一个invoker
对象出来的地方。PROXY_FACTORY是通过dubbo的spi自适应或站点来获取的,ProxyFactory的对象有:
默认的是使用JavassistProxyFactory,所以下一步就看一下JavassistProxyFactory了
然后再通过DelegateProviderMetaDataInvoker包装成一个包装对象,向外发布。
JavassistProxyFactory的解析
public class JavassistProxyFactory extends AbstractProxyFactory {
@Override
@SuppressWarnings("unchecked")
public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
}
@Override
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
// TODO Wrapper cannot handle this scenario correctly: the classname contains '$'
final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
//返回的是AbstractProxyInvoker匿部实现类
return new AbstractProxyInvoker<T>(proxy, type, url) {
@Override
protected Object doInvoke(T proxy, String methodName,
Class<?>[] parameterTypes,
Object[] arguments) throws Throwable {
//这里就是实际上在调用实现类的相应的方法
return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
}
};
}
}
public abstract class AbstractProxyInvoker<T> implements Invoker<T> {
Logger logger = LoggerFactory.getLogger(AbstractProxyInvoker.class);
private final T proxy;
private final Class<T> type;
private final URL url;
@Override
public Result invoke(Invocation invocation) throws RpcException {
try {
//调用子类的doInvoke方法
Object value = doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments());
CompletableFuture<Object> future = wrapWithFuture(value);
CompletableFuture<AppResponse> appResponseFuture = future.handle((obj, t) -> {
AppResponse result = new AppResponse();
if (t != null) {
if (t instanceof CompletionException) {
result.setException(t.getCause());
} else {
result.setException(t);
}
} else {
result.setValue(obj);
}
return result;
});
return new AsyncRpcResult(appResponseFuture, invocation);
} catch (Exception e) {
....
}
}
protected abstract Object doInvoke(T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable;
}
JavassistProxyFactory.getInvoker 是AbstractProxyInvoker。
所以invoker.invoke = AbstractProxyInvoker.invoke -> JavassistProxyFactory.doInvoke
.这里就是实际上在调用实现类的相应的方法
题外Wrapper怎么不用反射就可以调用实现类相应的方法
下面讨论一下:JavassistProxyFactory.getInvoker方法里面的Wrapper.getWrapper
public abstract class Wrapper {
public static Wrapper getWrapper(Class<?> c) {
while (ClassGenerator.isDynamicClass(c)) // can not wrapper on dynamic class.
{
c = c.getSuperclass();
}
if (c == Object.class) {
return OBJECT_WRAPPER;
}
return WRAPPER_MAP.computeIfAbsent(c, key -> makeWrapper(key));
}
private static Wrapper makeWrapper(Class<?> c) {
...c1,c2,c3方法的凭借..
ClassGenerator cc = ClassGenerator.newInstance(cl);
cc.setClassName((Modifier.isPublic(c.getModifiers()) ? Wrapper.class.getName() : c.getName() + "$sw") + id);
cc.setSuperClass(Wrapper.class);
cc.addDefaultConstructor();
cc.addField("public static String[] pns;"); // property name array.
cc.addField("public static " + Map.class.getName() + " pts;"); // property type map.
cc.addField("public static String[] mns;"); // all method name array.
cc.addField("public static String[] dmns;"); // declared method name array.
for (int i = 0, len = ms.size(); i < len; i++) {
cc.addField("public static Class[] mts" + i + ";");
}
cc.addMethod("public String[] getPropertyNames(){ return pns; }");
cc.addMethod("public boolean hasProperty(String n){ return pts.containsKey($1); }");
cc.addMethod("public Class getPropertyType(String n){ return (Class)pts.get($1); }");
cc.addMethod("public String[] getMethodNames(){ return mns; }");
cc.addMethod("public String[] getDeclaredMethodNames(){ return dmns; }");
cc.addMethod(c1.toString());
cc.addMethod(c2.toString());
cc.addMethod(c3.toString());
try {
Class<?> wc = cc.toClass();
// setup static field.
wc.getField("pts").set(null, pts);
wc.getField("pns").set(null, pts.keySet().toArray(new String[0]));
wc.getField("mns").set(null, mns.toArray(new String[0]));
wc.getField("dmns").set(null, dmns.toArray(new String[0]));
int ix = 0;
for (Method m : ms.values()) {
wc.getField("mts" + ix++).set(null, m.getParameterTypes());
}
return (Wrapper) wc.newInstance();
} catch (RuntimeException e) {
throw e;
} catch (Throwable e) {
throw new RuntimeException(e.getMessage(), e);
} finally {
cc.release();
ms.clear();
mns.clear();
dmns.clear();
}
}
}
可以看到Wrapper.getWrapper ->makeWrapper
makeWrapper做的事情就是拼接一下方法字符串。然后通过ClassGenerator来把这些方法通过cc.toClass
构建成一个类,最后通过wc.newInstance来把对象构建出来
那构建出来的类是什么,下面是debug的截图:
然后比较重要的其实也就是invokeMethod,因为JavassistProxyFactory.doInvoke里面就是调用包装类的invokeMethod,然后我把字符串复制下来,下面的代码就是了
public Object invokeMethod(Object o, String n, Class[] p, Object[] v) throws java.lang.reflect.InvocationTargetException{
com.onion.service.IUserService w;
try{ w = ((com.onion.service.IUserService)$1); }
catch(Throwable e){ throw new IllegalArgumentException(e); }
try{
if( "loginUser".equals( $2 ) && $3.length == 1 ) {
return ($w)w.loginUser((java.lang.String)$4[0]); }
if( "getUser".equals( $2 ) && $3.length == 1 ) {
return ($w)w.getUser((java.lang.String)$4[0]); }
}
catch(Throwable e) {
throw new java.lang.reflect.InvocationTargetException(e); }
throw new org.apache.dubbo.common.bytecode.NoSuchMethodException("Not found method ""+$2+"" in class com.onion.service.IUserService.");
}
这里的$w就是我们的实现类。所以为什么不用通过反射就能实现调用的原因就是在这里体现了
最后整个Dubbo的走向就到这里结束了