rpc(一、基础)

RPC基础

java动态代理

netty

zookeeper

参考

https://www.jianshu.com/p/8876c9f3cd7f

参考了大佬的RPC实现

github

自己实现了一遍,还是要自己动手才能感悟很多。

https://github.com/wangkang2/rpcproject

重点学习

生产端

服务的生产端基本就是将netty服务端启动,等待客户端连接。将服务注册到zookeeper。

消费端

BeanDefinitionRegistryPostProcessor

package com.rpc.configuration;


import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.beans.factory.support.BeanDefinitionRegistryPostProcessor;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;

@Component
public class RpcBeanDefinitionRegistryPostProcessor implements BeanDefinitionRegistryPostProcessor {
    //注册更多的bean到spring中
    @Override
    public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry beanDefinitionRegistry) throws BeansException {
        String basePackages = "com.rpc.service";
        RpcClassPathBeanDefinitionScanner scanner = new RpcClassPathBeanDefinitionScanner(beanDefinitionRegistry);
        scanner.scan(StringUtils.tokenizeToStringArray(basePackages, ConfigurableApplicationContext.CONFIG_LOCATION_DELIMITERS));
    }

    //自定义扩展改变bean的定义
    @Override
    public void postProcessBeanFactory(ConfigurableListableBeanFactory configurableListableBeanFactory) throws BeansException {

    }
}

spring容器初始化时,会从资源中读取bean的相关定义并保存在beanDefinitionMap中。postProcessBeanDefinitionRegistry方法通过beanDefinitionRegistry提供了对beanDefinition操作的方法,如判断、注册、移除等。

我们这里需要注入rpc接口的实现,通过ClassPathBeanDefinitionScanner扫描定义的包下面的类,进行注入。

ClassPathBeanDefinitionScanner

package com.rpc.configuration;

import com.rpc.annotation.RpcInterface;
import org.springframework.beans.factory.annotation.AnnotatedBeanDefinition;
import org.springframework.beans.factory.config.BeanDefinitionHolder;
import org.springframework.beans.factory.support.AbstractBeanDefinition;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.beans.factory.support.GenericBeanDefinition;
import org.springframework.context.annotation.ClassPathBeanDefinitionScanner;
import org.springframework.core.type.classreading.MetadataReader;
import org.springframework.core.type.classreading.MetadataReaderFactory;
import org.springframework.core.type.filter.AnnotationTypeFilter;
import org.springframework.core.type.filter.TypeFilter;

import java.io.IOException;
import java.util.Arrays;
import java.util.Set;

/**
 * 类扫描分析器
 */
public class RpcClassPathBeanDefinitionScanner extends ClassPathBeanDefinitionScanner{

    private RpcFactoryBean<?> rpcFactoryBean = new RpcFactoryBean<Object>();

    public RpcClassPathBeanDefinitionScanner(BeanDefinitionRegistry registry) {
        super(registry);
    }

    public Set<BeanDefinitionHolder> doScan(String... basePackages) {

        //过滤器注册
        //只有通过过滤器,这个class才能被转换成BeanDefinition注册到spring容器里
        addIncludeFilter(new TypeFilter() {
            @Override
            public boolean match(MetadataReader metadataReader, MetadataReaderFactory metadataReaderFactory) throws IOException {
                return true;
            }
        });
        //通过自定义注解过滤器注入
        //addIncludeFilter(new AnnotationTypeFilter(RpcInterface.class));

        Set<BeanDefinitionHolder> beanDefinitions = super.doScan(basePackages);

        if (beanDefinitions.isEmpty()) {
            logger.warn("No RPC mapper was found in '"
                    + Arrays.toString(basePackages)
                    + "' package. Please check your configuration.");
        } else {
            processBeanDefinitions(beanDefinitions);
        }

        return beanDefinitions;
    }

    private void processBeanDefinitions(
            Set<BeanDefinitionHolder> beanDefinitions) {

        GenericBeanDefinition definition;

        for (BeanDefinitionHolder holder : beanDefinitions) {

            //获取beanDefinition实例
            definition = (GenericBeanDefinition) holder.getBeanDefinition();
            //构造方法,参数为接口类
            definition.getConstructorArgumentValues().addGenericArgumentValue(definition.getBeanClassName());
            //设置实例化类
            definition.setBeanClass(this.rpcFactoryBean.getClass());
            //设置根据类型注入
            definition.setAutowireMode(AbstractBeanDefinition.AUTOWIRE_BY_TYPE);
            System.out.println(holder);
        }
    }

    protected boolean isCandidateComponent(AnnotatedBeanDefinition beanDefinition) {
        return beanDefinition.getMetadata().isInterface() && beanDefinition.getMetadata().isIndependent();
    }
}

spring类扫描器,调用的是父类的scan方法,父类又调用子类的doscan方法,子类经过过滤器又调用父类的doscan方法,完成注入。

 FactoryBean

package com.rpc.configuration;

import org.springframework.beans.factory.FactoryBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.lang.reflect.Proxy;

public class RpcFactoryBean<T> implements FactoryBean<T> {

    private Class<T> rpcInterface;

    @Autowired
    RpcFactory<T> factory;

    public RpcFactoryBean(){
    }

    public RpcFactoryBean(Class<T> rpcInterface){
        this.rpcInterface = rpcInterface;
    }

    //返回对象的实例
    public T getObject() throws Exception {
        return getRpc();
    }

    //bean的类型,这里就是相对应的接口class类
    public Class<?> getObjectType() {
        return this.rpcInterface;
    }

    //true单例,false非单例
    public boolean isSingleton() {
        return true;
    }

    //通过动态代理实现对象的实例化,调用会调用invoke方法
    public <T> T getRpc() {
        return (T) Proxy.newProxyInstance(rpcInterface.getClassLoader(), new Class[] { rpcInterface },factory);
    }
}

实现FactoryBean接口,可以让我们自定义bean的创建过程,要满足三点:bean实例,bean类型,是否单例。

InvocationHandler

package com.rpc.configuration;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.rpc.entity.Request;
import com.rpc.entity.Response;
import com.rpc.entity.User;
import com.rpc.netty.NettyClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.util.*;

/**
 * Created by MACHENIKE on 2018-12-03.
 */
@Component
public class RpcFactory<T> implements InvocationHandler {

    @Autowired
    NettyClient client;

    Logger logger = LoggerFactory.getLogger(this.getClass());

    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {

        Request request = new Request();
        request.setClassName(method.getDeclaringClass().getName());
        request.setMethodName(method.getName());
        request.setParameterTypes(method.getParameterTypes());
        request.setParameters(args);
        request.setRequestId(UUID.randomUUID().toString());
        //client.connection();
        Response response = JSON.parseObject(client.send(request).toString(), Response.class);
        Class<?> returnType = method.getReturnType();
        if(response.getCode()==0){
            Object data = response.getData();
            return JSONObject.parseObject(data.toString(), returnType);
        }
        return null;
    }
}

这里就是动态代理的具体实现过程,通过netty调用服务端来实现远程调用的过程。

原文地址:https://www.cnblogs.com/Unlimited-Blade-Works/p/13474654.html