dubbo 集群容错源码

  1 /*
  2  * Licensed to the Apache Software Foundation (ASF) under one or more
  3  * contributor license agreements.  See the NOTICE file distributed with
  4  * this work for additional information regarding copyright ownership.
  5  * The ASF licenses this file to You under the Apache License, Version 2.0
  6  * (the "License"); you may not use this file except in compliance with
  7  * the License.  You may obtain a copy of the License at
  8  *
  9  *     http://www.apache.org/licenses/LICENSE-2.0
 10  *
 11  * Unless required by applicable law or agreed to in writing, software
 12  * distributed under the License is distributed on an "AS IS" BASIS,
 13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 14  * See the License for the specific language governing permissions and
 15  * limitations under the License.
 16  */
 17 package com.alibaba.dubbo.rpc.cluster.support;
 18 
 19 import com.alibaba.dubbo.common.Constants;
 20 import com.alibaba.dubbo.common.Version;
 21 import com.alibaba.dubbo.common.logger.Logger;
 22 import com.alibaba.dubbo.common.logger.LoggerFactory;
 23 import com.alibaba.dubbo.common.utils.NetUtils;
 24 import com.alibaba.dubbo.rpc.Invocation;
 25 import com.alibaba.dubbo.rpc.Invoker;
 26 import com.alibaba.dubbo.rpc.Result;
 27 import com.alibaba.dubbo.rpc.RpcContext;
 28 import com.alibaba.dubbo.rpc.RpcException;
 29 import com.alibaba.dubbo.rpc.cluster.Directory;
 30 import com.alibaba.dubbo.rpc.cluster.LoadBalance;
 31 
 32 import java.util.ArrayList;
 33 import java.util.HashSet;
 34 import java.util.List;
 35 import java.util.Set;
 36 
 37 /**
 38  * When invoke fails, log the initial error and retry other invokers (retry n times, which means at most n different invokers will be invoked)
 39  * Note that retry causes latency.
 40  * <p>
 41  * <a href="http://en.wikipedia.org/wiki/Failover">Failover</a>
 42  *
 43  */
 44 public class FailoverClusterInvoker<T> extends AbstractClusterInvoker<T> {
 45 
 46     private static final Logger logger = LoggerFactory.getLogger(FailoverClusterInvoker.class);
 47 
 48     public FailoverClusterInvoker(Directory<T> directory) {
 49         super(directory);
 50     }
 51 
 52     @Override
 53     @SuppressWarnings({"unchecked", "rawtypes"})
 54     public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
 55         List<Invoker<T>> copyinvokers = invokers;
 56         checkInvokers(copyinvokers, invocation);
 57         int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
 58         if (len <= 0) {
 59             len = 1;
 60         }
 61         // retry loop.
 62         RpcException le = null; // last exception.
 63         List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); // invoked invokers.
 64         Set<String> providers = new HashSet<String>(len);
 65         for (int i = 0; i < len; i++) {
 66             //Reselect before retry to avoid a change of candidate `invokers`.
 67             //NOTE: if `invokers` changed, then `invoked` also lose accuracy.
 68             if (i > 0) {
 69                 checkWhetherDestroyed();
 70                 copyinvokers = list(invocation);
 71                 // check again
 72                 checkInvokers(copyinvokers, invocation);
 73             }
 74             Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked);
 75             invoked.add(invoker);
 76             RpcContext.getContext().setInvokers((List) invoked);
 77             try {
 78                 Result result = invoker.invoke(invocation);
 79                 if (le != null && logger.isWarnEnabled()) {
 80                     logger.warn("Although retry the method " + invocation.getMethodName()
 81                             + " in the service " + getInterface().getName()
 82                             + " was successful by the provider " + invoker.getUrl().getAddress()
 83                             + ", but there have been failed providers " + providers
 84                             + " (" + providers.size() + "/" + copyinvokers.size()
 85                             + ") from the registry " + directory.getUrl().getAddress()
 86                             + " on the consumer " + NetUtils.getLocalHost()
 87                             + " using the dubbo version " + Version.getVersion() + ". Last error is: "
 88                             + le.getMessage(), le);
 89                 }
 90                 return result;
 91             } catch (RpcException e) {
 92                 if (e.isBiz()) { // biz exception.
 93                     throw e;
 94                 }
 95                 le = e;
 96             } catch (Throwable e) {
 97                 le = new RpcException(e.getMessage(), e);
 98             } finally {
 99                 providers.add(invoker.getUrl().getAddress());
100             }
101         }
102         throw new RpcException(le != null ? le.getCode() : 0, "Failed to invoke the method "
103                 + invocation.getMethodName() + " in the service " + getInterface().getName()
104                 + ". Tried " + len + " times of the providers " + providers
105                 + " (" + providers.size() + "/" + copyinvokers.size()
106                 + ") from the registry " + directory.getUrl().getAddress()
107                 + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version "
108                 + Version.getVersion() + ". Last error is: "
109                 + (le != null ? le.getMessage() : ""), le != null && le.getCause() != null ? le.getCause() : le);
110     }
111 
112 }
 1 /*
 2  * Licensed to the Apache Software Foundation (ASF) under one or more
 3  * contributor license agreements.  See the NOTICE file distributed with
 4  * this work for additional information regarding copyright ownership.
 5  * The ASF licenses this file to You under the Apache License, Version 2.0
 6  * (the "License"); you may not use this file except in compliance with
 7  * the License.  You may obtain a copy of the License at
 8  *
 9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17 package com.alibaba.dubbo.rpc.cluster.support;
18 
19 import com.alibaba.dubbo.common.Version;
20 import com.alibaba.dubbo.common.utils.NetUtils;
21 import com.alibaba.dubbo.rpc.Invocation;
22 import com.alibaba.dubbo.rpc.Invoker;
23 import com.alibaba.dubbo.rpc.Result;
24 import com.alibaba.dubbo.rpc.RpcException;
25 import com.alibaba.dubbo.rpc.cluster.Directory;
26 import com.alibaba.dubbo.rpc.cluster.LoadBalance;
27 
28 import java.util.List;
29 
30 /**
31  * Execute exactly once, which means this policy will throw an exception immediately in case of an invocation error.
32  * Usually used for non-idempotent write operations
33  *
34  * <a href="http://en.wikipedia.org/wiki/Fail-fast">Fail-fast</a>
35  *
36  */
37 public class FailfastClusterInvoker<T> extends AbstractClusterInvoker<T> {
38 
39     public FailfastClusterInvoker(Directory<T> directory) {
40         super(directory);
41     }
42 
43     @Override
44     public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
45         checkInvokers(invokers, invocation);
46         Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
47         try {
48             return invoker.invoke(invocation);
49         } catch (Throwable e) {
50             if (e instanceof RpcException && ((RpcException) e).isBiz()) { // biz exception.
51                 throw (RpcException) e;
52             }
53             throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0, "Failfast invoke providers " + invoker.getUrl() + " " + loadbalance.getClass().getSimpleName() + " select from all providers " + invokers + " for service " + getInterface().getName() + " method " + invocation.getMethodName() + " on consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e);
54         }
55     }
56 }
 1 /*
 2  * Licensed to the Apache Software Foundation (ASF) under one or more
 3  * contributor license agreements.  See the NOTICE file distributed with
 4  * this work for additional information regarding copyright ownership.
 5  * The ASF licenses this file to You under the Apache License, Version 2.0
 6  * (the "License"); you may not use this file except in compliance with
 7  * the License.  You may obtain a copy of the License at
 8  *
 9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17 package com.alibaba.dubbo.rpc.cluster.support;
18 
19 import com.alibaba.dubbo.common.logger.Logger;
20 import com.alibaba.dubbo.common.logger.LoggerFactory;
21 import com.alibaba.dubbo.rpc.Invocation;
22 import com.alibaba.dubbo.rpc.Invoker;
23 import com.alibaba.dubbo.rpc.Result;
24 import com.alibaba.dubbo.rpc.RpcException;
25 import com.alibaba.dubbo.rpc.RpcResult;
26 import com.alibaba.dubbo.rpc.cluster.Directory;
27 import com.alibaba.dubbo.rpc.cluster.LoadBalance;
28 
29 import java.util.List;
30 
31 /**
32  * When invoke fails, log the error message and ignore this error by returning an empty RpcResult.
33  * Usually used to write audit logs and other operations
34  *
35  * <a href="http://en.wikipedia.org/wiki/Fail-safe">Fail-safe</a>
36  *
37  */
38 public class FailsafeClusterInvoker<T> extends AbstractClusterInvoker<T> {
39     private static final Logger logger = LoggerFactory.getLogger(FailsafeClusterInvoker.class);
40 
41     public FailsafeClusterInvoker(Directory<T> directory) {
42         super(directory);
43     }
44 
45     @Override
46     public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
47         try {
48             checkInvokers(invokers, invocation);
49             Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
50             return invoker.invoke(invocation);
51         } catch (Throwable e) {
52             logger.error("Failsafe ignore exception: " + e.getMessage(), e);
53             return new RpcResult(); // ignore
54         }
55     }
56 }
  1 /*
  2  * Licensed to the Apache Software Foundation (ASF) under one or more
  3  * contributor license agreements.  See the NOTICE file distributed with
  4  * this work for additional information regarding copyright ownership.
  5  * The ASF licenses this file to You under the Apache License, Version 2.0
  6  * (the "License"); you may not use this file except in compliance with
  7  * the License.  You may obtain a copy of the License at
  8  *
  9  *     http://www.apache.org/licenses/LICENSE-2.0
 10  *
 11  * Unless required by applicable law or agreed to in writing, software
 12  * distributed under the License is distributed on an "AS IS" BASIS,
 13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 14  * See the License for the specific language governing permissions and
 15  * limitations under the License.
 16  */
 17 package com.alibaba.dubbo.rpc.cluster.support;
 18 
 19 import com.alibaba.dubbo.common.logger.Logger;
 20 import com.alibaba.dubbo.common.logger.LoggerFactory;
 21 import com.alibaba.dubbo.common.utils.NamedThreadFactory;
 22 import com.alibaba.dubbo.rpc.Invocation;
 23 import com.alibaba.dubbo.rpc.Invoker;
 24 import com.alibaba.dubbo.rpc.Result;
 25 import com.alibaba.dubbo.rpc.RpcException;
 26 import com.alibaba.dubbo.rpc.RpcResult;
 27 import com.alibaba.dubbo.rpc.cluster.Directory;
 28 import com.alibaba.dubbo.rpc.cluster.LoadBalance;
 29 
 30 import java.util.HashMap;
 31 import java.util.List;
 32 import java.util.Map;
 33 import java.util.concurrent.ConcurrentHashMap;
 34 import java.util.concurrent.ConcurrentMap;
 35 import java.util.concurrent.Executors;
 36 import java.util.concurrent.ScheduledExecutorService;
 37 import java.util.concurrent.ScheduledFuture;
 38 import java.util.concurrent.TimeUnit;
 39 
 40 /**
 41  * When fails, record failure requests and schedule for retry on a regular interval.
 42  * Especially useful for services of notification.
 43  *
 44  * <a href="http://en.wikipedia.org/wiki/Failback">Failback</a>
 45  *
 46  */
 47 public class FailbackClusterInvoker<T> extends AbstractClusterInvoker<T> {
 48 
 49     private static final Logger logger = LoggerFactory.getLogger(FailbackClusterInvoker.class);
 50 
 51     private static final long RETRY_FAILED_PERIOD = 5 * 1000;
 52 
 53     private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2, new NamedThreadFactory("failback-cluster-timer", true));
 54     private final ConcurrentMap<Invocation, AbstractClusterInvoker<?>> failed = new ConcurrentHashMap<Invocation, AbstractClusterInvoker<?>>();
 55     private volatile ScheduledFuture<?> retryFuture;
 56 
 57     public FailbackClusterInvoker(Directory<T> directory) {
 58         super(directory);
 59     }
 60 
 61     private void addFailed(Invocation invocation, AbstractClusterInvoker<?> router) {
 62         if (retryFuture == null) {
 63             synchronized (this) {
 64                 if (retryFuture == null) {
 65                     retryFuture = scheduledExecutorService.scheduleWithFixedDelay(new Runnable() {
 66 
 67                         @Override
 68                         public void run() {
 69                             // collect retry statistics
 70                             try {
 71                                 retryFailed();
 72                             } catch (Throwable t) { // Defensive fault tolerance
 73                                 logger.error("Unexpected error occur at collect statistic", t);
 74                             }
 75                         }
 76                     }, RETRY_FAILED_PERIOD, RETRY_FAILED_PERIOD, TimeUnit.MILLISECONDS);
 77                 }
 78             }
 79         }
 80         failed.put(invocation, router);
 81     }
 82 
 83     void retryFailed() {
 84         if (failed.size() == 0) {
 85             return;
 86         }
 87         for (Map.Entry<Invocation, AbstractClusterInvoker<?>> entry : new HashMap<Invocation, AbstractClusterInvoker<?>>(
 88                 failed).entrySet()) {
 89             Invocation invocation = entry.getKey();
 90             Invoker<?> invoker = entry.getValue();
 91             try {
 92                 invoker.invoke(invocation);
 93                 failed.remove(invocation);
 94             } catch (Throwable e) {
 95                 logger.error("Failed retry to invoke method " + invocation.getMethodName() + ", waiting again.", e);
 96             }
 97         }
 98     }
 99 
100     @Override
101     protected Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
102         try {
103             checkInvokers(invokers, invocation);
104             Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
105             return invoker.invoke(invocation);
106         } catch (Throwable e) {
107             logger.error("Failback to invoke method " + invocation.getMethodName() + ", wait for retry in background. Ignored exception: "
108                     + e.getMessage() + ", ", e);
109             addFailed(invocation, this);
110             return new RpcResult(); // ignore
111         }
112     }
113 
114 }
  1 /*
  2  * Licensed to the Apache Software Foundation (ASF) under one or more
  3  * contributor license agreements.  See the NOTICE file distributed with
  4  * this work for additional information regarding copyright ownership.
  5  * The ASF licenses this file to You under the Apache License, Version 2.0
  6  * (the "License"); you may not use this file except in compliance with
  7  * the License.  You may obtain a copy of the License at
  8  *
  9  *     http://www.apache.org/licenses/LICENSE-2.0
 10  *
 11  * Unless required by applicable law or agreed to in writing, software
 12  * distributed under the License is distributed on an "AS IS" BASIS,
 13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 14  * See the License for the specific language governing permissions and
 15  * limitations under the License.
 16  */
 17 package com.alibaba.dubbo.rpc.cluster.support;
 18 
 19 import com.alibaba.dubbo.common.Constants;
 20 import com.alibaba.dubbo.common.utils.NamedThreadFactory;
 21 import com.alibaba.dubbo.rpc.Invocation;
 22 import com.alibaba.dubbo.rpc.Invoker;
 23 import com.alibaba.dubbo.rpc.Result;
 24 import com.alibaba.dubbo.rpc.RpcContext;
 25 import com.alibaba.dubbo.rpc.RpcException;
 26 import com.alibaba.dubbo.rpc.cluster.Directory;
 27 import com.alibaba.dubbo.rpc.cluster.LoadBalance;
 28 
 29 import java.util.ArrayList;
 30 import java.util.List;
 31 import java.util.concurrent.BlockingQueue;
 32 import java.util.concurrent.ExecutorService;
 33 import java.util.concurrent.Executors;
 34 import java.util.concurrent.LinkedBlockingQueue;
 35 import java.util.concurrent.TimeUnit;
 36 import java.util.concurrent.atomic.AtomicInteger;
 37 
 38 /**
 39  * Invoke a specific number of invokers concurrently, usually used for demanding real-time operations, but need to waste more service resources.
 40  *
 41  * <a href="http://en.wikipedia.org/wiki/Fork_(topology)">Fork</a>
 42  *
 43  */
 44 public class ForkingClusterInvoker<T> extends AbstractClusterInvoker<T> {
 45 
 46     private final ExecutorService executor = Executors.newCachedThreadPool(new NamedThreadFactory("forking-cluster-timer", true));
 47 
 48     public ForkingClusterInvoker(Directory<T> directory) {
 49         super(directory);
 50     }
 51 
 52     @Override
 53     @SuppressWarnings({"unchecked", "rawtypes"})
 54     public Result doInvoke(final Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
 55         checkInvokers(invokers, invocation);
 56         final List<Invoker<T>> selected;
 57         final int forks = getUrl().getParameter(Constants.FORKS_KEY, Constants.DEFAULT_FORKS);
 58         final int timeout = getUrl().getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
 59         if (forks <= 0 || forks >= invokers.size()) {
 60             selected = invokers;
 61         } else {
 62             selected = new ArrayList<Invoker<T>>();
 63             for (int i = 0; i < forks; i++) {
 64                 // TODO. Add some comment here, refer chinese version for more details.
 65                 Invoker<T> invoker = select(loadbalance, invocation, invokers, selected);
 66                 if (!selected.contains(invoker)) {//Avoid add the same invoker several times.
 67                     selected.add(invoker);
 68                 }
 69             }
 70         }
 71         RpcContext.getContext().setInvokers((List) selected);
 72         final AtomicInteger count = new AtomicInteger();
 73         final BlockingQueue<Object> ref = new LinkedBlockingQueue<Object>();
 74         for (final Invoker<T> invoker : selected) {
 75             executor.execute(new Runnable() {
 76                 @Override
 77                 public void run() {
 78                     try {
 79                         Result result = invoker.invoke(invocation);
 80                         ref.offer(result);
 81                     } catch (Throwable e) {
 82                         int value = count.incrementAndGet();
 83                         if (value >= selected.size()) {
 84                             ref.offer(e);
 85                         }
 86                     }
 87                 }
 88             });
 89         }
 90         try {
 91             Object ret = ref.poll(timeout, TimeUnit.MILLISECONDS);
 92             if (ret instanceof Throwable) {
 93                 Throwable e = (Throwable) ret;
 94                 throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0, "Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e);
 95             }
 96             return (Result) ret;
 97         } catch (InterruptedException e) {
 98             throw new RpcException("Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e);
 99         }
100     }
101 }
 1 /*
 2  * Licensed to the Apache Software Foundation (ASF) under one or more
 3  * contributor license agreements.  See the NOTICE file distributed with
 4  * this work for additional information regarding copyright ownership.
 5  * The ASF licenses this file to You under the Apache License, Version 2.0
 6  * (the "License"); you may not use this file except in compliance with
 7  * the License.  You may obtain a copy of the License at
 8  *
 9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17 package com.alibaba.dubbo.rpc.cluster.support;
18 
19 import com.alibaba.dubbo.common.logger.Logger;
20 import com.alibaba.dubbo.common.logger.LoggerFactory;
21 import com.alibaba.dubbo.rpc.Invocation;
22 import com.alibaba.dubbo.rpc.Invoker;
23 import com.alibaba.dubbo.rpc.Result;
24 import com.alibaba.dubbo.rpc.RpcContext;
25 import com.alibaba.dubbo.rpc.RpcException;
26 import com.alibaba.dubbo.rpc.cluster.Directory;
27 import com.alibaba.dubbo.rpc.cluster.LoadBalance;
28 
29 import java.util.List;
30 
31 /**
32  * BroadcastClusterInvoker
33  *
34  */
35 public class BroadcastClusterInvoker<T> extends AbstractClusterInvoker<T> {
36 
37     private static final Logger logger = LoggerFactory.getLogger(BroadcastClusterInvoker.class);
38 
39     public BroadcastClusterInvoker(Directory<T> directory) {
40         super(directory);
41     }
42 
43     @Override
44     @SuppressWarnings({"unchecked", "rawtypes"})
45     public Result doInvoke(final Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
46         checkInvokers(invokers, invocation);
47         RpcContext.getContext().setInvokers((List) invokers);
48         RpcException exception = null;
49         Result result = null;
50         for (Invoker<T> invoker : invokers) {
51             try {
52                 result = invoker.invoke(invocation);
53             } catch (RpcException e) {
54                 exception = e;
55                 logger.warn(e.getMessage(), e);
56             } catch (Throwable e) {
57                 exception = new RpcException(e.getMessage(), e);
58                 logger.warn(e.getMessage(), e);
59             }
60         }
61         if (exception != null) {
62             throw exception;
63         }
64         return result;
65     }
66 
67 }
原文地址:https://www.cnblogs.com/toUpdating/p/9058482.html