栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

dubbo 事件通知

dubbo 事件通知


dubbo 事件通知

           

官网:https://dubbo.apache.org/zh/docs/advanced/events-notify/

          

               

                                 

事件通知

          

应用:消费端调用服务端服务之前、正常返回、出现异常时,可触发对应的事件通知(oninvoke、onreturn、onthrow)

         

服务端:服务接口

interface IDemoService {
    public Person get(int id);
}

       

服务端:服务接口实现

@DubboService
public class NormalDemoService implements IDemoService {
    public Person get(int id) {
        return new Person(id, "charles`son", 4);
    }
}

         

消费端:事件通知接口

interface Notify {
    public void onreturn(Person msg, Integer id);
    public void onthrow(Throwable ex, Integer id);
}

         

消费端:事件通知接口实现

public class NotifyImpl implements Notify {
    public Map    ret    = new HashMap();
    public Map errors = new HashMap();
    
    public void onreturn(Person msg, Integer id) {
        System.out.println("onreturn:" + msg);
        ret.put(id, msg);
    }
    
    public void onthrow(Throwable ex, Integer id) {
        errors.put(id, ex);
    }
}

            

消费端:事件通知配置



      


# async:默认false,即事件通知同步执行
async=true onreturn="xxx":异步通知
async=false onreturn="xxx":同步通知
async=true:异步无回调
async=false:同步无回调

                

                   

                                 

实现原理

    

FutureFilter:消费端事件通知过滤器

@Activate(group = CommonConstants.CONSUMER)    //消费端过滤器
public class FutureFilter implements ClusterFilter, ClusterFilter.Listener {
                                               //为clusterFilter子类

    protected static final Logger logger = LoggerFactory.getLogger(FutureFilter.class);

    @Override
    public Result invoke(final Invoker invoker, final Invocation invocation) throws RpcException {
        fireInvokeCallback(invoker, invocation);  //oninvoke事件在服务调用之前执行
        // need to configure if there's return value before the invocation in order to help invoker to judge if it's
        // necessary to return future.
        return invoker.invoke(invocation);
    }

    @Override
    public void onResponse(Result result, Invoker invoker, Invocation invocation) {
        if (result.hasException()) {    //响应时有异常,触发onthrow事件通知
            fireThrowCallback(invoker, invocation, result.getException());
        } else {                        //响应没有异常,触发onreturn事件通知
            fireReturnCallback(invoker, invocation, result.getValue());
        }
    }

    @Override
    public void onError(Throwable t, Invoker invoker, Invocation invocation) {
        fireThrowCallback(invoker, invocation, t);
    }   //有异常,触发onthrow事件通知

    private void fireInvokeCallback(final Invoker invoker, final Invocation invocation) {
                 //触发oninvoke事件通知
        final AsyncMethodInfo asyncMethodInfo = getAsyncMethodInfo(invoker, invocation);
        if (asyncMethodInfo == null) {
            return;
        }
        final Method onInvokeMethod = asyncMethodInfo.getoninvokeMethod();  //获取调用通知方法
        final Object onInvokeInst = asyncMethodInfo.getoninvokeInstance();  //获取调用接口实现类实例

        if (onInvokeMethod == null && onInvokeInst == null) {
            return;
        }
        if (onInvokeMethod == null || onInvokeInst == null) {
            throw new IllegalStateException("service:" + invoker.getUrl().getServiceKey() + " has a oninvoke callback config , but no such " + (onInvokeMethod == null ? "method" : "instance") + " found. url:" + invoker.getUrl());
        }
        if (!onInvokeMethod.isAccessible()) {
            onInvokeMethod.setAccessible(true);
        }

        Object[] params = invocation.getArguments();      //调用接口的方法参数
        try {
            onInvokeMethod.invoke(onInvokeInst, params);  //反射调用通知方法,invoke方法需跟调用的接口方法有相同的参数
        } catch (InvocationTargetException e) {
            fireThrowCallback(invoker, invocation, e.getTargetException());
        } catch (Throwable e) {
            fireThrowCallback(invoker, invocation, e);
        }
    }

    private void fireReturnCallback(final Invoker invoker, final Invocation invocation, final Object result) {
        final AsyncMethodInfo asyncMethodInfo = getAsyncMethodInfo(invoker, invocation);
        if (asyncMethodInfo == null) {
            return;
        }

        final Method onReturnMethod = asyncMethodInfo.getonreturnMethod();  //获取return方法
        final Object onReturnInst = asyncMethodInfo.getonreturnInstance();  //获取调用接口实现类实例

        //not set onreturn callback
        if (onReturnMethod == null && onReturnInst == null) {
            return;
        }

        if (onReturnMethod == null || onReturnInst == null) {
            throw new IllegalStateException("service:" + invoker.getUrl().getServiceKey() + " has a onreturn callback config , but no such " + (onReturnMethod == null ? "method" : "instance") + " found. url:" + invoker.getUrl());
        }
        if (!onReturnMethod.isAccessible()) {
            onReturnMethod.setAccessible(true);
        }

        Object[] args = invocation.getArguments();
        Object[] params;
        Class[] rParaTypes = onReturnMethod.getParameterTypes();
        if (rParaTypes.length > 1) {     //如果事件回调接口return方法超过一个方法参数
            if (rParaTypes.length == 2 && rParaTypes[1].isAssignableFrom(Object[].class)) {  //如果有两个参数,并且第2个参数是Object[]数组
                                                   //设置第1个参数为服务返回结果
                                                   //第2个参数设置为调用方法参数数组
                params = new Object[2];
                params[0] = result;
                params[1] = args;
            } else {                               //否则,第1个参数设置为返回结果 
                                                   //后面的参数依次复制调用方法参数
                params = new Object[args.length + 1];
                params[0] = result;
                System.arraycopy(args, 0, params, 1, args.length);
            }   
        } else {    //如果return方法只有一个参数,直接设置为返回结果
            params = new Object[]{result};
        }
        try {
            onReturnMethod.invoke(onReturnInst, params);  //反射调用return方法
        } catch (InvocationTargetException e) {
            fireThrowCallback(invoker, invocation, e.getTargetException());
        } catch (Throwable e) {
            fireThrowCallback(invoker, invocation, e);  //如果出现异常,调用onthrow事件通知方法
        }
    }

    private void fireThrowCallback(final Invoker invoker, final Invocation invocation, final Throwable exception) {
        final AsyncMethodInfo asyncMethodInfo = getAsyncMethodInfo(invoker, invocation);
        if (asyncMethodInfo == null) {
            return;
        }

        final Method onthrowMethod = asyncMethodInfo.getonthrowMethod();  //获取throw方法
        final Object onthrowInst = asyncMethodInfo.getonthrowInstance();  //获取调用接口实现类实例

        //onthrow callback not configured
        if (onthrowMethod == null && onthrowInst == null) {
            return;
        }
        if (onthrowMethod == null || onthrowInst == null) {
            throw new IllegalStateException("service:" + invoker.getUrl().getServiceKey() + " has a onthrow callback config , but no such " + (onthrowMethod == null ? "method" : "instance") + " found. url:" + invoker.getUrl());
        }
        if (!onthrowMethod.isAccessible()) {
            onthrowMethod.setAccessible(true);
        }
        Class[] rParaTypes = onthrowMethod.getParameterTypes();  //获取onthrow方法
        if (rParaTypes[0].isAssignableFrom(exception.getClass())) { //如果第一个参数是exception类或者子类
            try {
                Object[] args = invocation.getArguments();   //获取调用方法的参数
                Object[] params;

                if (rParaTypes.length > 1) {   //onThrow参数长度超过1
                    if (rParaTypes.length == 2 && rParaTypes[1].isAssignableFrom(Object[].class)) {
                                               //如果等于2,并且第2个参数是Object[]数组
                        params = new Object[2];
                        params[0] = exception; //设置第1个参数为抛出的异常
                        params[1] = args;      //第2个参数为object[]数组
                    } else {
                        params = new Object[args.length + 1];
                        params[0] = exception;
                        System.arraycopy(args, 0, params, 1, args.length);
                    }                          //否则,第1个参数是抛出的异常,后面一次复制调用方法参数
                } else {
                    params = new Object[]{exception};  //onThrow参数长度为1,参数直接设置为抛出的异常
                }
                onthrowMethod.invoke(onthrowInst, params);  //反射调用throw方法
            } catch (Throwable e) {
                logger.error(invocation.getMethodName() + ".call back method invoke error . callback method :" + onthrowMethod + ", url:" + invoker.getUrl(), e);
            }
        } else {
            logger.error(invocation.getMethodName() + ".call back method invoke error . callback method :" + onthrowMethod + ", url:" + invoker.getUrl(), exception);
        }
    }

    private AsyncMethodInfo getAsyncMethodInfo(Invoker invoker, Invocation invocation) {
                            //获取通知方法信息
        AsyncMethodInfo asyncMethodInfo = (AsyncMethodInfo) invocation.get(ASYNC_METHOD_INFO);
        if (asyncMethodInfo != null) {
            return asyncMethodInfo;
        }

        ServiceModel serviceModel = invocation.getServiceModel();
        if (!(serviceModel instanceof ConsumerModel)) {
            return null;
        }

        String methodName = invocation.getMethodName();
        if (methodName.equals($INVOKE)) {
            methodName = (String) invocation.getArguments()[0];
        }

        return ((ConsumerModel) serviceModel).getAsyncInfo(methodName);
    }

}

             

                

                                 

使用示例

    

**********

服务端

      

                         

          

application.yml

dubbo:
  application:
    name: dubbo-provider
    #register-mode: instance
  registry:
    address: localhost:2181
    protocol: zookeeper
    group: dubbo
  protocol:
    name: dubbo
    #port: 20880

        

HelloService

public interface HelloService {

    String hello(String name, Integer age);
}

        

HelloServiceImpl

@DubboService
public class HelloServiceImpl implements HelloService {

    @Override
    public String hello(String name, Integer age) {
        System.out.println(name+"  "+age);

        return "hello";
    }
}

           

**********

消费端

     

                         

            

application.yml

dubbo:
  application:
    name: dubbo-consumer
  registry:
    protocol: zookeeper
    address: localhost:2181
    group: dubbo
    #register-mode: instance
  protocol:
    name: dubbo
    #port: 20880

server:
  port: 8081

          

HelloService

public interface HelloService {

    String hello(String name, Integer age);
}

          

CustomCallback

public class CustomCallback {

    public void onInvoke(String name, Integer age){
        System.out.println("远程调用参数为:"+name+" "+age);
    }

    public void onReturn(String msg){
        System.out.println("服务返回信息:"+msg);
    }

    public void onThrow(Exception e){
        System.out.println("服务调用出现异常:"+e.getMessage());
    }
}

          

HelloController

@RestController
public class HelloController {

    @Resource
    private HelloService helloService;

    @RequestMapping("/hello")
    public String hello(){
        return helloService.hello("瓜田李下", 20);
    }
}

           

DemoApplication

@EnableDubbo
@SpringBootApplication
@importResource("classpath:dubbo/consumer.xml")  //导入配置文件
public class DemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }

}

         

dubbo/conusmer.xml




    
    
        
    

             

**********

使用测试

    

localhost:8081/hello,控制台输出:

2022-02-25 13:00:30.788  INFO 3007 --- [           main] com.example.demo.DemoApplication         : Started DemoApplication in 3.092 seconds (JVM running for 3.667)
2022-02-25 13:08:34.187  INFO 3007 --- [nio-8081-exec-1] o.a.c.c.C.[Tomcat].[localhost].[/]       : Initializing Spring DispatcherServlet 'dispatcherServlet'
2022-02-25 13:08:34.187  INFO 3007 --- [nio-8081-exec-1] o.s.web.servlet.DispatcherServlet        : Initializing Servlet 'dispatcherServlet'
2022-02-25 13:08:34.188  INFO 3007 --- [nio-8081-exec-1] o.s.web.servlet.DispatcherServlet        : Completed initialization in 1 ms
远程调用参数为:瓜田李下 20
服务返回信息:hello

消费端发起服务调用时,执行了对应的通知方法

         

               

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/745177.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号