栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

dubbo 分组聚合

dubbo 分组聚合


dubbo 分组聚合

         

官网:https://dubbo.apache.org/zh/docs/advanced/group-merger/

              

                

                                  

分组聚合

         

应用:将不同group的同一接口返回的数据合并

            

分组聚合配置

# 服务端提供的服务




# 所有分组的相同接口都聚合


# 合并指定分组:aaa,bbb


# 合并接口的指定方法(getMenuItems),其余方法不合并(调用任何一个分组)

    


# 指定方法(getMenuItems)不合并,其余方法都合并

    

              

merger值设置

# 默认提供的合并其进行合并
merger=true

# 接口方法返回类型的方法
public interface HelloService2 {
    List hello2();
}
merger=".addAll":表示调用list的addAll方法进行合并

# 自定义合并器
自定义合并器com.example.demo.merger.CustomMerger(需实现Merger接口)
resources目录下新建文件:meta-INF/dubbo/org.apache.dubbo.rpc.cluster.Merger,
写入key及自定义合并器的全限定名:custom=com.example.demo.merger.CustomMerger

              

MergeableClusterInvoker:消费端合并调用

@SuppressWarnings("unchecked")
public class MergeableClusterInvoker extends AbstractClusterInvoker {

    private static final Logger log = LoggerFactory.getLogger(MergeableClusterInvoker.class);

    public MergeableClusterInvoker(Directory directory) {
        super(directory);
    }

    @Override
    protected Result doInvoke(Invocation invocation, List> invokers, LoadBalance loadbalance) throws RpcException {
        checkInvokers(invokers, invocation);
        String merger = getUrl().getMethodParameter(invocation.getMethodName(), MERGER_KEY);
                        //获取merger设置的值
        if (ConfigUtils.isEmpty(merger)) { // If a method doesn't have a merger, only invoke one Group
                                           //如果没有merger为空,只调用一个group中的invoker
            for (final Invoker invoker : invokers) {
                if (invoker.isAvailable()) {
                    try {
                        return invokeWithContext(invoker, invocation);
                    } catch (RpcException e) {
                        if (e.isNoInvokerAvailableAfterFilter()) {
                            log.debug("No available provider for service" + getUrl().getServiceKey() + " on group "
                                + invoker.getUrl().getGroup() + ", will continue to try another group.");
                        } else {
                            throw e;
                        }
                    }
                }
            }
            return invokeWithContext(invokers.iterator().next(), invocation);
        }

        Class returnType;
        try {
            returnType = getInterface().getMethod(invocation.getMethodName(), invocation.getParameterTypes()).getReturnType();
                                //获取调用方法的返回类型
        } catch (NoSuchMethodException e) {
            returnType = null;
        }

        Map results = new HashMap<>();
        for (final Invoker invoker : invokers) {
            RpcInvocation subInvocation = new RpcInvocation(invocation, invoker);
            subInvocation.setAttachment(ASYNC_KEY, "true");
            results.put(invoker.getUrl().getServiceKey(), invokeWithContext(invoker, subInvocation));
        }   //调用需合并分组的invoker,执行获取返回结果

        Object result;

        List resultList = new ArrayList<>(results.size());

        for (Map.Entry entry : results.entrySet()) {
            Result asyncResult = entry.getValue();
            try {
                Result r = asyncResult.get(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
                if (r.hasException()) {
                    log.error("Invoke " + getGroupDescFromServiceKey(entry.getKey()) +
                        " failed: " + r.getException().getMessage(), r.getException());
                } else {
                    resultList.add(r);   //获取的结果集
                }
            } catch (Exception e) {
                throw new RpcException("Failed to invoke service " + entry.getKey() + ": " + e.getMessage(), e);
            }
        }

        if (resultList.isEmpty()) {
            return AsyncRpcResult.newDefaultAsyncResult(invocation);
        } else if (resultList.size() == 1) {
            return AsyncRpcResult.newDefaultAsyncResult(resultList.get(0).getValue(), invocation);
        }

        if (returnType == void.class) {
            return AsyncRpcResult.newDefaultAsyncResult(invocation);
        }

        if (merger.startsWith(".")) {   //如果merger以"."开头,表示调用返回类型的方法
            merger = merger.substring(1);
            Method method;
            try {
                method = returnType.getMethod(merger, returnType);  //反射获取method对象
            } catch (NoSuchMethodException | NullPointerException e) {
                throw new RpcException("Can not merge result because missing method [ " + merger + " ] in class [ " +
                    returnType.getName() + " ]");
            }
            if (!Modifier.isPublic(method.getModifiers())) {
                method.setAccessible(true);
            }
            result = resultList.remove(0).getValue();
            try {
                if (method.getReturnType() != void.class
                    && method.getReturnType().isAssignableFrom(result.getClass())) {
                    for (Result r : resultList) {
                        result = method.invoke(result, r.getValue());
                    }   //返回类型不是void,调用方法合并
                } else {
                    for (Result r : resultList) {
                        method.invoke(result, r.getValue());
                    }   //返回类型时void,调用方法
                }
            } catch (Exception e) {
                throw new RpcException("Can not merge result: " + e.getMessage(), e);
            }
        } else {     //使用合并器合并(包含自定义的合并器)
            Merger resultMerger;
            ApplicationModel applicationModel = ScopeModelUtil.getApplicationModel(invocation.getModuleModel().getApplicationModel());

            if (ConfigUtils.isDefault(merger)) {
                resultMerger = applicationModel.getBeanFactory().getBean(MergerFactory.class).getMerger(returnType);
                              //默认合并器
            } else {
                resultMerger = applicationModel.getExtensionLoader(Merger.class).getExtension(merger);
            }                 //自定义合并器
            if (resultMerger != null) {
                List rets = new ArrayList<>(resultList.size());
                for (Result r : resultList) {
                    rets.add(r.getValue());
                }
                result = resultMerger.merge(rets.toArray((Object[]) Array.newInstance(returnType, 0)));
            } else {
                throw new RpcException("There is no merger to merge result.");
            }
        }
        return AsyncRpcResult.newDefaultAsyncResult(result, invocation);
    }


    public Class getInterface() {
    public boolean isAvailable() {
    public void destroy() {

    private String getGroupDescFromServiceKey(String key) {
 

               

Merger:合并器接口

@SPI
public interface Merger {

    T merge(T... items);

}

                 

          

meta-INF/dubbo/internal/org.apache.dubbo.rpc.cluster.Merger文件

map=org.apache.dubbo.rpc.cluster.merger.MapMerger
set=org.apache.dubbo.rpc.cluster.merger.SetMerger
list=org.apache.dubbo.rpc.cluster.merger.ListMerger
byte=org.apache.dubbo.rpc.cluster.merger.ByteArrayMerger
char=org.apache.dubbo.rpc.cluster.merger.CharArrayMerger
short=org.apache.dubbo.rpc.cluster.merger.ShortArrayMerger
int=org.apache.dubbo.rpc.cluster.merger.IntArrayMerger
long=org.apache.dubbo.rpc.cluster.merger.LongArrayMerger
float=org.apache.dubbo.rpc.cluster.merger.FloatArrayMerger
double=org.apache.dubbo.rpc.cluster.merger.DoubleArrayMerger
boolean=org.apache.dubbo.rpc.cluster.merger.BooleanArrayMerger

              

ListMeger:list合并器

public class ListMerger implements Merger> {

    @Override
    public List merge(List... items) {
        if (ArrayUtils.isEmpty(items)) {
            return Collections.emptyList();
        }
        return Stream.of(items).filter(Objects::nonNull)
                .flatMap(Collection::stream)
                .collect(Collectors.toList());

    }

}
 

            

               

                                  

使用示例

           

***********

provider

         

                 

          

application.yml

dubbo:
  application:
    name: dubbo-provider
    #register-mode: instance   # 3.0.5版本如果只注册instance,消费端group="*"、group="aaa,bbb"时会报错,指定group="aaa"可正常使用
  registry:
    address: zookeeper://localhost:2181
    group: dubbo
    #register-mode: instance
    #register: false
  protocol:
    name: dubbo
    port: 20880

            

HelloService

public interface HelloService {

    List hello();
}

        

HelloServiceImpl

@DubboService(group = "aaa")
public class HelloServiceImpl implements HelloService {

    @Override
    public List hello() {
        System.out.println("hello provider");

        return Collections.singletonList("hello provider");
    }
}

         

HelloService2Impl

@DubboService(group = "bbb")
public class HelloService2Impl implements HelloService {

    @Override
    public List hello() {
        return Collections.singletonList("hello provider2");
    }
}

           

HelloService3Impl

@DubboService(group = "ccc")
public class HelloService3Impl implements HelloService {

    @Override
    public List hello() {
        return Collections.singletonList("hello provider3");
    }
}

          

               

***********

consumer

       

                   

            

application.yml

dubbo:
  application:
    name: dubbo-consumer
  registry:
    address: zookeeper://localhost:2181
    group: dubbo
    #register-mode: instance
  protocol:
    name: dubbo
    #port: 20880

server:
  port: 8081

          

CustomMerger

public class CustomMerger implements Merger> {

    @Override
    public List merge(List... items) {
        List result = new ArrayList<>();
        result.add("自定义merger:");
        Arrays.stream(items).forEach(result::addAll);

        return result;
    }
}
 

         

meta-INF/dubbo/org.apache.dubbo.rpc.cluster.Merger文件

custom=com.example.demo.merger.CustomMerger

              

HelloController

@RestController
public class HelloController {

    @DubboReference(merger = "custom", group = "*")     //使用自定义的merger
    private HelloService helloService;

    @DubboReference(merger = "list", group = "aaa,bbb") //使用系统merger:ListMerger
    private HelloService helloService2;

    @DubboReference(group = "aaa")                      //指定使用group="aaa"的服务,不合并
    private HelloService helloService3;

    @RequestMapping("/hello")
    public String hello(){
        helloService.hello().forEach(System.out::print);

        System.out.println();
        helloService2.hello().forEach(System.out::print);

        System.out.println();
        helloService3.hello().forEach(System.out::print);

        return "hello consumer";
    }
}

                

***********

使用测试

       

localhost:8081/hello,控制台输出:

2022-02-12 18:26:02.592  INFO 1496 --- [nio-8081-exec-1] o.a.c.c.C.[Tomcat].[localhost].[/]       : Initializing Spring DispatcherServlet 'dispatcherServlet'
2022-02-12 18:26:02.592  INFO 1496 --- [nio-8081-exec-1] o.s.web.servlet.DispatcherServlet        : Initializing Servlet 'dispatcherServlet'
2022-02-12 18:26:02.593  INFO 1496 --- [nio-8081-exec-1] o.s.web.servlet.DispatcherServlet        : Completed initialization in 1 ms
自定义merger:hello provider3hello provider2hello provider
hello provider2hello provider
hello provider

自定义的merger使用成功

               

          

转载请注明:文章转载自 www.mshxw.com
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号