Dubbo多组消费时负载均衡策略失效问题

多组消费

dubbo多组消费即消费者订阅的group为*或者,分隔比如188,1这种形式!服务治理时有些情况会用到这种形式,比如跨地区部署等。

问题现象

Consumer订阅配置如下

     <dubbo:consumer check="false" timeout="60000" loadbalance="xxfirst"/>

    <dubbo:reference id="provider1"
                     interface="com.tc.dubbo.api.Provider1"
                     registry="dubbo-registry"
                     check="false"
                     group="188,1"/>

实践发现自定义的router路由策略是有效的! 但是原有的xxfirst负载均衡策略失效了!

源码分析

RegistryProtocol的doRefer()方法内部cluster.join()负责创建ClusterInvoker对象,所有的cluster的invoker的选择逻辑都在这个函数实现。

Dubbo默认的cluster是FailoverCluster,对应FailoverClusterInvoker,其选择策略是会从负载均衡策略选择!

FailoverClusterInvoker.java


  @SuppressWarnings({"unchecked", "rawtypes"})
    public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
        List<Invoker<T>> copyinvokers = invokers;
        checkInvokers(copyinvokers, invocation);
        int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
        if (len <= 0) {
            len = 1;
        }
        // retry loop.
        RpcException le = null; // last exception.
        List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); // invoked invokers.
        Set<String> providers = new HashSet<String>(len);
        for (int i = 0; i < len; i++) {
            //Reselect before retry to avoid a change of candidate `invokers`.
            //NOTE: if `invokers` changed, then `invoked` also lose accuracy.
            if (i > 0) {
                checkWhetherDestroyed();
                copyinvokers = list(invocation);
                // check again
                checkInvokers(copyinvokers, invocation);
            }
// 这里会走负载均衡策略
            Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked);
            invoked.add(invoker);
            RpcContext.getContext().setInvokers((List) invoked);
            ...
        }
       ...
    }

}

而当group是*或者188,1这种形式时。RegistryProtocol的doRefer()时使用的是MergeableCluster!

RegistryProtocol.java

    public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
        url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY);
        Registry registry = registryFactory.getRegistry(url);
        if (RegistryService.class.equals(type)) {
            return proxyFactory.getInvoker((T) registry, type, url);
        }

        // group="a,b" or group="*"
        Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY));
        String group = qs.get(Constants.GROUP_KEY);
//这里doRefer是使用的MergeableCluster
        if (group != null && group.length() > 0) {
            if ((Constants.COMMA_SPLIT_PATTERN.split(group)).length > 1
                    || "*".equals(group)) {
                return doRefer(getMergeableCluster(), registry, type, url);
            }
        }
        return doRefer(cluster, registry, type, url);
    }

    private Cluster getMergeableCluster() {
        return ExtensionLoader.getExtensionLoader(Cluster.class).getExtension("mergeable");
    }

MergeableCluster对应MergeableClusterInvoker,看下其实现并没有走负载均衡策略!

MergeableClusterInvoker.java

  @SuppressWarnings("rawtypes")
    public Result invoke(final Invocation invocation) throws RpcException {
// 这里直接从Directory里取invokers,并没有走负载均衡!
        List<Invoker<T>> invokers = directory.list(invocation);
        ......
        return new RpcResult(result);
    }

RegistryDirectory继承至AbstractDirectory,其list方法会走router逻辑,所以路由器还是有效的!

AbstractDirectory.java


  public List<Invoker<T>> list(Invocation invocation) throws RpcException {
        if (destroyed) {
            throw new RpcException("Directory already destroyed .url: " + getUrl());
        }
        List<Invoker<T>> invokers = doList(invocation);
        List<Router> localRouters = this.routers; // local reference
        if (localRouters != null && !localRouters.isEmpty()) {
            for (Router router : localRouters) {
                try {
                    if (router.getUrl() == null || router.getUrl().getParameter(Constants.RUNTIME_KEY, false)) {//router过滤逻辑
                        invokers = router.route(invokers, getConsumerUrl(), invocation);
                    }
                } catch (Throwable t) {
                    logger.error("Failed to execute router: " + getUrl() + ", cause: " + t.getMessage(), t);
                }
            }
        }
        return invokers;
    }

# dubbo 

评论

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×