栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Event Bus 设计模式

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Event Bus 设计模式

一 点睛

消息中间件提供了系统之间的异步处理机制,比如在电商网站上支付订单之后,会触发库存计算,物流调度计算,甚至是营销人员绩效计算,报表统计等,诸如此类的操作一般会耗费比订单购买商品本身更多的时间,加之这样的操作没有即时的时效性要求,用户在下单之后完全没有必要等待电商后端做完所有操作才算成功,那么此时消息中间件是一种非常好的解决方案,用户下单支付之后即可向用户返回购买成功的通知,然后提交各种消息到消息中间件,这样注册在消息中间件的其他系统就可以顺利地接受订单通知了,然后执行各自的业务逻辑。消息中间件主要用于解决进程之间消息异步处理的解决方案。

Bus 接口:对外提供几种主要的使用方式,比如 post 方法用来发送 Event,register 方法用来注册 Evnet 接收者(Subscriber)接受相应事件,EventBus 采用同步的方式推送 Event,AsyncEventBus 采用异步的方式(Thread-Per-Message)推送 Event。

Register 注册表:主要用来记录对应 Subscriber 以及受理消息的回调方法,回调方法用注解 @Subscribe 来标识。

Dispatcher:主要用来将 event 广播给注册表中监听了 topic 的 Subscriber。

二 实战 1 Bus 接口注解
package concurrent.eventbus;


public interface Bus {
    // 将某个对象注册到 Bus 上,从此之后该类就成为了 Subscriber 了
    void register(Object subscriber);

    // 将某个对象从 Bus 上取消注册,取消注册之后就不会再接受到来自 Bus 的任何消息
    void unregister(Object subscriber);

    // 提交 Event 到默认的 topic
    void post(Object event);

    // 提交 Event 到指定的 topic
    void post(Object Event, String topic);

    // 关闭该 bus
    void close();

    // 返回 Bus 的名称标识
    String getBusName();
}
2 同步 EventBus
package concurrent.eventbus;

import java.util.concurrent.Executor;


public class EventBus implements Bus {
    // 用于维护 Subscriber 的注册表
    private final Registry registry = new Registry();
    // Event Bus 的名字
    private String busName;

    // 默认的 Event Bus 的名字
    private final static String DEFAULT_BUS_NAME = "default";

    // 默认的 Event Bus 的名字
    private final static String DEFAULT_TOPIC = "default-topic";

    // 用于分发广播消息到各个 Subscriber 的类
    private final Dispatcher dispatcher;

    public EventBus() {
        this(DEFAULT_BUS_NAME, null, Dispatcher.SEQ_EXECUTOR_SERVICE);
    }

    public EventBus(String busName) {
        this(busName, null, Dispatcher.SEQ_EXECUTOR_SERVICE);
    }

    public EventBus(String busName, EventExceptionHandler eventExceptionHandler, Executor executor) {
        this.busName = busName;
        this.dispatcher = Dispatcher.newDispatcher(eventExceptionHandler, executor);
    }

    public EventBus(EventExceptionHandler eventExceptionHandler) {
        this(DEFAULT_BUS_NAME, eventExceptionHandler, Dispatcher.SEQ_EXECUTOR_SERVICE);
    }

    @Override
    public void register(Object subscriber) {
        this.registry.bind(subscriber);
    }

    @Override
    public void unregister(Object subscriber) {
        this.registry.unbind(subscriber);

    }

    @Override
    public void post(Object event) {
        this.post(event, DEFAULT_TOPIC);
    }

    @Override
    public void post(Object event, String topic) {
        this.dispatcher.dispatch(this, registry, event, topic);
    }

    @Override
    public void close() {
        this.dispatcher.close();
    }

    @Override
    public String getBusName() {
        return null;
    }
}
3 异步 EventBus
package concurrent.eventbus;

import java.util.concurrent.ThreadPoolExecutor;


public class AsyncEventBus extends EventBus {
    AsyncEventBus(String busName, EventExceptionHandler exceptionHandler, ThreadPoolExecutor executor) {
        super(busName, exceptionHandler, executor);
    }

    AsyncEventBus(String busName, ThreadPoolExecutor executor) {
        this(busName, null, executor);
    }

    AsyncEventBus(ThreadPoolExecutor executor) {
        this("default_async", null, executor);
    }

    AsyncEventBus(EventExceptionHandler exceptionHandler, ThreadPoolExecutor executor) {
        this("default_async", exceptionHandler, executor);
    }
}
4 注册表 Registry
package concurrent.eventbus;

import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;


public class Registry {
    // 存储 Subscriber 集合和 topic 之间关系的 map
    private final ConcurrentHashMap> subscriberContainer = new ConcurrentHashMap<>();

    public void bind(Object subscriber) {
        // 获取 Subscriber Object 的方法集合,然后进行绑定
        List subscribeMethods = getSubscribeMethods(subscriber);
        subscribeMethods.forEach(m -> tierSubscriber(subscriber, m));
    }

    public void unbind(Object subscriber) {
        // unbind 为了提高速度,只对 Subscriber 进行失效操作
        subscriberContainer.forEach((key, queue) ->
                queue.forEach(s -> {
                    if (s.getSubscribeObject() == subscriber) {
                        s.setDisable(true);
                    }
                })
        );
    }

    private void tierSubscriber(Object subscriber, Method method) {
        final Subscribe subscribe = method.getDeclaredAnnotation(Subscribe.class);
        String topic = subscribe.topic();
        // 当某个 topic 没有 Subscriber Queue 的时候创建一个
        subscriberContainer.computeIfAbsent(topic, key -> new ConcurrentLinkedQueue<>());
        // 创建一个 subscriber 并且加入 subscriber 列表中
        subscriberContainer.get(topic).add(new Subscriber(subscriber, method));
    }

    public ConcurrentLinkedQueue scanSubscriber(final String topic) {
        return subscriberContainer.get(topic);

    }

    private List getSubscribeMethods(Object subcriber) {
        final List methods = new ArrayList<>();
        Class temp = subcriber.getClass();
        // 不断获取所有的方法
        while (temp != null) {
            // 获取所有的方法
            Method[] declaredMethods = temp.getDeclaredMethods();
            // 只有 public 方法 && 有一个入参 && 被 @Subscribe 标记的方法才符合回调方法
            Arrays.stream(declaredMethods)
                    .filter(m -> m.isAnnotationPresent(Subscribe.class)
                            && m.getParameterCount() == 1
                            && m.getModifiers() == Modifier.PUBLIC)
                    .forEach(methods::add);
            temp = temp.getSuperclass();
        }
        return methods;
    }

}
5 Event 广播 Dispatch
package concurrent.eventbus;

import java.lang.reflect.Method;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;


public class Dispatcher {
    private final Executor executorService;
    private final EventExceptionHandler exceptionHandler;
    public static final Executor SEQ_EXECUTOR_SERVICE = SeqExecutorService.INSTANCE;
    public static final Executor PRE_THREAD_EXECUTOR_SERVICE = PreThreadExecutorService.INSTANCE;

    public Dispatcher(Executor executorService, EventExceptionHandler exceptionHandler) {
        this.executorService = executorService;
        this.exceptionHandler = exceptionHandler;
    }

    public void dispatch(Bus bus, Registry registry, Object event, String topic) {
        // 根据 topic 获取所有的 Subscriber 列表
        ConcurrentLinkedQueue subscribers = registry.scanSubscriber(topic);
        if (null == subscribers) {
            if (exceptionHandler != null) {
                exceptionHandler.handler(new IllegalArgumentException("The topic" + topic + " note bind yet"), new BaseEventContext(bus.getBusName(), null, event));
                return;
            }
        }

        // 遍历所有的方法,并且通过反射的方式进行方法调用
        subscribers.stream()
                .filter(subscriber -> !subscriber.isDisable())
                .filter(subscriber -> {
                    Method subcribeMethod = subscriber.getSubscribeMethod();
                    Class aClass = subcribeMethod.getParameterTypes()[0];
                    return aClass.isAssignableFrom(event.getClass());
                }).forEach(subscriber -> realInvokeSubscribe(subscriber, event, bus));
    }

    private void realInvokeSubscribe(Subscriber subscriber, Object event, Bus bus) {
        Method subscribeMethod = subscriber.getSubscribeMethod();
        Object subscribeObject = subscriber.getSubscribeObject();
        executorService.execute(() -> {
            try {
                subscribeMethod.invoke(subscribeObject, event);
            } catch (Exception e) {
                if (null != exceptionHandler) {
                    exceptionHandler.handler(e, new BaseEventContext(bus.getBusName(), subscriber, event));
                }
            }
        });
    }

    public void close() {
        if (executorService instanceof ExecutorService) {
            ((ExecutorService) executorService).shutdown();
        }
    }

    static Dispatcher newDispatcher(EventExceptionHandler exceptionHandler, Executor executor) {
        return new Dispatcher(executor, exceptionHandler);
    }

    static Dispatcher seqDispatcher(EventExceptionHandler exceptionHandler) {
        return new Dispatcher(SEQ_EXECUTOR_SERVICE, exceptionHandler);
    }

    static Dispatcher perThreadDispatcher(EventExceptionHandler exceptionHandler) {
        return new Dispatcher(PRE_THREAD_EXECUTOR_SERVICE, exceptionHandler);
    }

    // 顺序执行的 ExecutorService
    private static class SeqExecutorService implements Executor {
        private final static SeqExecutorService INSTANCE = new SeqExecutorService();

        @Override
        public void execute(Runnable command) {
            command.run();
        }
    }

    // 每个线程负责一次消息推送
    private static class PreThreadExecutorService implements Executor {
        private final static PreThreadExecutorService INSTANCE = new PreThreadExecutorService();

        @Override
        public void execute(Runnable command) {
            new Thread(command).start();
        }
    }

    // 默认 EventContext 实现
    private static class BaseEventContext implements EventContext {
        private final String eventBusName;

        private final Subscriber subscriber;

        private final Object event;

        private BaseEventContext(String eventBusName, Subscriber subscriber, Object event) {
            this.eventBusName = eventBusName;
            this.subscriber = subscriber;
            this.event = event;
        }

        @Override
        public String getSource() {
            return this.eventBusName;
        }

        @Override
        public Object getSubscriber() {
            return subscriber != null ? subscriber.getSubscribeObject() : null;
        }

        @Override
        public Method getSubscribe() {
            return subscriber != null ? subscriber.getSubscribeMethod() : null;
        }

        @Override
        public Object getEvent() {
            return this.event;
        }
    }
}
6 Subscriber 类
package concurrent.eventbus;

import java.lang.reflect.Method;


public class Subscriber {
    private final Object subscribeObject;
    private final Method subscribeMethod;
    private boolean disable = false;

    public Subscriber(Object subscribeObject, Method subscribeMethod) {
        this.subscribeObject = subscribeObject;
        this.subscribeMethod = subscribeMethod;
    }

    public Object getSubscribeObject() {
        return subscribeObject;
    }

    public Method getSubscribeMethod() {
        return subscribeMethod;
    }

    public boolean isDisable() {
        return disable;
    }

    public void setDisable(boolean disable) {
        this.disable = disable;
    }
}
7 EventExceptionHandle 接口
package concurrent.eventbus;


public interface EventExceptionHandler {
    void handler(Throwable cause, EventContext context);
}
8 EventContext
package concurrent.eventbus;

import java.lang.reflect.Method;


public interface EventContext {
    String getSource();

    Object getSubscriber();

    Method getSubscribe();

    Object getEvent();
}
9 Subscribe 注解
package concurrent.eventbus;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;


@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface Subscribe {
    String topic() default "default-topic";
}
10 SimpleSubscriber1
package concurrent.eventbus;


public class SimpleSubscriber1 {
    @Subscribe
    public void method1(String message){
        System.out.println("==SimpleSubscriber1==method1=="+message);
    }

    @Subscribe(topic = "test")
    public void method2(String message){
        System.out.println("==SimpleSubscriber1==method2=="+message);
    }
}
11 SimpleSubscriber2
package concurrent.eventbus;


public class SimpleSubscriber2 {
    @Subscribe
    public void method1(String message){
        System.out.println("==SimpleSubscriber2==method1=="+message);
    }

    @Subscribe(topic = "test")
    public void method2(String message){
        System.out.println("==SimpleSubscriber2==method2=="+message);
    }
}
12  同步 Event Bus 测试类
package concurrent.eventbus;

public class SyncTest {
    public static void main(String[] args) {
        Bus bus = new EventBus("Test");
        bus.register(new SimpleSubscriber1());
        bus.register(new SimpleSubscriber2());
        bus.post("Hello");
        System.out.println("---------");
        bus.post("Hello", "test");
    }
}
13 异步 Event Bus 测试类
package concurrent.eventbus;

import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;

public class ASyncTest {
    public static void main(String[] args) {
        Bus bus = new AsyncEventBus("Test", (ThreadPoolExecutor) Executors.newFixedThreadPool(10));
        bus.register(new SimpleSubscriber1());
        bus.register(new SimpleSubscriber2());
        bus.post("Hello");
        System.out.println("---------");
        bus.post("Hello", "test");
    }
}
三 测试结果 1 同步 Event Bus 测试结果

==SimpleSubscriber1==method1==Hello

==SimpleSubscriber2==method1==Hello

---------

==SimpleSubscriber1==method2==Hello

==SimpleSubscriber2==method2==Hello

2 异步  Event Bus 测试结果

---------

==SimpleSubscriber2==method1==Hello

==SimpleSubscriber1==method1==Hello

==SimpleSubscriber1==method2==Hello

==SimpleSubscriber2==method2==Hello

 

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/881376.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号