Pigeon的客户端调用责任链

pigeon客户端调用的责任链代码在  InvokerProcessHandlerFactory # init()

public static void init() {
        if (!isInitialized) {
            if (Constants.MONITOR_ENABLE) {
                registerBizProcessFilter(new RemoteCallMonitorInvokeFilter());
            }
            registerBizProcessFilter(new TraceFilter());
            registerBizProcessFilter(new FaultInjectionFilter());
            registerBizProcessFilter(new DegradationFilter());
            registerBizProcessFilter(new ClusterInvokeFilter());
            registerBizProcessFilter(new GatewayInvokeFilter());
            registerBizProcessFilter(new ContextPrepareInvokeFilter());
            registerBizProcessFilter(new SecurityFilter());
            registerBizProcessFilter(new RemoteCallInvokeFilter());
            bizInvocationHandler = createInvocationHandler(bizProcessFilters);
            isInitialized = true;
        }
    }

责任链在这里的作用就是对于 InvokerContext 的不断补充。比如说 ClusterInvokeFilter

public class ClusterInvokeFilter extends InvocationInvokeFilter {

    private static final Logger logger = LoggerLoader.getLogger(ClusterInvokeFilter.class);

    public InvocationResponse invoke(ServiceInvocationHandler handler, InvokerContext invocationContext)
            throws Throwable {
        InvokerConfig<?> invokerConfig = invocationContext.getInvokerConfig();
        Cluster cluster = ClusterFactory.selectCluster(invokerConfig.getCluster());
        if (cluster == null) {
            throw new IllegalArgumentException("Unsupported cluster type:" + cluster);
        }
        return cluster.invoke(handler, invocationContext);
    }

}
public class ClusterFactory {

    private final static ConcurrentHashMap<String, Cluster> clusters = new ConcurrentHashMap<String, Cluster>();

    static {
        init();
    }

    public static void init() {
        clusters.put(Constants.CLUSTER_FAILFAST, new FailfastCluster());
        clusters.put(Constants.CLUSTER_FAILOVER, new FailoverCluster());
        clusters.put(Constants.CLUSTER_FAILSAFE, new FailsafeCluster());
        clusters.put(Constants.CLUSTER_FORKING, new ForkingCluster());
    }

    public static void registerCluster(String clusterType, Cluster cluster) {
        clusters.put(clusterType, cluster);
    }

    public static Cluster selectCluster(String clusterType) {
        Cluster cluster = clusters.get(clusterType);
        if (cluster == null) {
            return clusters.get(Constants.CLUSTER_FAILFAST);
        }
        return cluster;
    }
}

默认的是  Constants.CLUSTER_FAILFAST 

public class FailfastCluster implements Cluster {

    private ClientManager clientManager = ClientManager.getInstance();

    private static final Logger logger = LoggerLoader.getLogger(FailfastCluster.class);

    @Override
    public InvocationResponse invoke(ServiceInvocationHandler handler, InvokerContext invocationContext)
            throws Throwable {
        InvokerConfig<?> invokerConfig = invocationContext.getInvokerConfig();
        InvocationRequest request = InvokerUtils.createRemoteCallRequest(invocationContext, invokerConfig);

        boolean timeoutRetry = invokerConfig.isTimeoutRetry();
        if (!timeoutRetry) {
            Client remoteClient = clientManager.getClient(invokerConfig, request, null);
            invocationContext.setClient(remoteClient);//让后续的发送tcp的filter能够拿到的netty的客户端
            try {
                return handler.handle(invocationContext);
            } catch (NetworkException e) {
                remoteClient = clientManager.getClient(invokerConfig, request, null);
                invocationContext.setClient(remoteClient);
                return handler.handle(invocationContext);
            }
        } else {
     ......

  

原文地址:https://www.cnblogs.com/juniorMa/p/14805871.html