一、观察者模式
1. 观察者模式基本概念2. 观察者模式的应用场景3. 观察者模式的类图 二、设计异步多渠道群发框架
2.1. 定义消息观察者抽象接口2.2. 创建观察者2.3. 主题通知所有观察者2.4. 观察者注册2.5. 自定义线程池2.6. 签单通知入口2.6. 异步通知接口测试2.7. 依赖 三、Spring事件通知
3.1. 定义消息实体类3.2. 定义(邮件)事件通知3.3. 定义(短信)事件通知3.4. 签单同步通知入口3.5. 测试效果3.6. 开源项目
一、观察者模式 1. 观察者模式基本概念一个对象状态改变,通知给其他所有的对象
2. 观察者模式的应用场景Zk的事件监听、分布式配置中心刷新配置文件、业务中群发不同渠道消息
3. 观察者模式的类图 二、设计异步多渠道群发框架 2.1. 定义消息观察者抽象接口package com.gblfy.observer;
import com.alibaba.fastjson.JSONObject;
public interface GblfyObServer {
void sendMsg(JSonObject jsonObject);
}
2.2. 创建观察者
短信观察者
package com.gblfy.observer.impl;
import com.alibaba.fastjson.JSONObject;
import com.gblfy.observer.GblfyObServer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class SmsObServer implements GblfyObServer {
@Override
@Async("customAsyncThreadPool")
public void sendMsg(JSonObject jsonObject) {
log.info("观察者模式发送->短信-->{}", jsonObject.toJSONString());
}
}
邮件观察者
package com.gblfy.observer.impl;
import com.alibaba.fastjson.JSONObject;
import com.gblfy.observer.GblfyObServer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class EmailObServer implements GblfyObServer {
@Override
@Async("customAsyncThreadPool")
public void sendMsg(JSonObject jsonObject) {
log.info("观察者模式发送->邮件-->{}",jsonObject.toJSONString());
}
}
2.3. 主题通知所有观察者
package com.gblfy.observer;
import com.alibaba.fastjson.JSONObject;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
@Component
public class GblfySubject {
//观察者容器
private List obServerList = new ArrayList<>();
public void addObServer(GblfyObServer gblfyObServer) {
obServerList.add(gblfyObServer);
}
public void notifyObServer(JSonObject jsonObject) {
obServerList.stream().forEach(p -> p.sendMsg(jsonObject));
}
}
2.4. 观察者注册
项目启动自动注册观察者
package com.gblfy.start;
import com.gblfy.observer.GblfyObServer;
import com.gblfy.observer.GblfySubject;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;
import java.util.Map;
@Component
public class StartService implements ApplicationRunner, ApplicationContextAware {
@Autowired
private GblfySubject gblfySubject;
//初始化上下文对象
private ApplicationContext applicationContext;
@Override
public void run(ApplicationArguments args) throws Exception {
Map map = applicationContext.getBeansOfType(GblfyObServer.class);
for (String key : map.keySet()) {
GblfyObServer gblfyObServer = map.get(key);
gblfySubject.addObServer(gblfyObServer);
}
}
//获取上下文
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
}
2.5. 自定义线程池
package com.gblfy.config;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
@Component
@EnableAsync
public class AsyncScheduledTaskConfig {
@Bean("customAsyncThreadPool")
public Executor customAsyncThreadPool() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//最大线程数
executor.setMaxPoolSize(100);
//核心线程数
executor.setCorePoolSize(10);
//任务队列的大小
executor.setQueueCapacity(10);
//线程池名的前缀
executor.setThreadNamePrefix("gblfy-signpolicy-asynnotify-");
//允许线程的空闲时间30秒
executor.setKeepAliveSeconds(30);
//设置线程池关闭的时候等待所有任务都完成再继续销毁其他的Bean
executor.setWaitForTasksToCompleteOnShutdown(true);
//设置线程池中任务的等待时间,如果超过这个时候还没有销毁就强制销毁,以确保应用最后能够被关闭,而不是阻塞住
executor.setAwaitTerminationSeconds(60);
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
//线程初始化
executor.initialize();
return executor;
}
}
2.6. 签单通知入口
package com.gblfy.controller;
import com.alibaba.fastjson.JSONObject;
import com.gblfy.observer.GblfySubject;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@Slf4j
@RestController
public class SignPolicyController {
@Autowired
private GblfySubject gblfySubject;
@GetMapping("/signPolicyToAsynNotify")
public String signPolicy() {
log.info("将签单信息保存数据库处理");
JSonObject jsonObject = new JSONObject();
jsonObject.put("sms", "1766666666");
jsonObject.put("email", "1766@163.com");
gblfySubject.notifyObServer(jsonObject);
return "success";
}
}
2.6. 异步通知接口测试
http://localhost:8080/signPolicyToAsynNotify2.7. 依赖
org.apache.commons
commons-lang3
3.12.0
com.alibaba
fastjson
1.2.79
org.springframework.boot
spring-boot-starter-web
三、Spring事件通知
3.1. 定义消息实体类
package com.gblfy.entity;
import org.springframework.context.ApplicationEvent;
public class SignPolicyMsgEntity extends ApplicationEvent {
private String email;
private String phone;
private String userId;
public SignPolicyMsgEntity(Object source) {
super(source);
}
public SignPolicyMsgEntity(Object source, String email, String phone) {
super(source);
this.email = email;
this.phone = phone;
}
@Override
public String toString() {
return "email:" + email + ",phone:" + phone;
}
}
3.2. 定义(邮件)事件通知
package com.gblfy.listener; import com.gblfy.entity.SignPolicyMsgEntity; import lombok.extern.slf4j.Slf4j; import org.springframework.context.ApplicationListener; import org.springframework.stereotype.Component; @Component @Slf4j public class EmailListener implements ApplicationListener3.3. 定义(短信)事件通知{ @Override public void onApplicationEvent(SignPolicyMsgEntity event) { log.info("eamil:->{}", event.toString()); } }
package com.gblfy.listener; import com.gblfy.entity.SignPolicyMsgEntity; import lombok.extern.slf4j.Slf4j; import org.springframework.context.ApplicationListener; import org.springframework.stereotype.Component; @Component @Slf4j public class SmsListener implements ApplicationListener3.4. 签单同步通知入口{ @Override public void onApplicationEvent(SignPolicyMsgEntity event) { log.info("sms:->{}", event.toString()); } }
@Autowired
private ApplicationEventPublisher applicationEventPublisher;
@GetMapping("/signPolicyToSyncNotify")
public String signPolicyToSyncNotify() {
log.info("将签单信息保存数据库处理");
SignPolicyMsgEntity signPolicyMsgEntity = new SignPolicyMsgEntity(this, "1766@163.com", "1766666666");
applicationEventPublisher.publishEvent(signPolicyMsgEntity);
return "success";
}
3.5. 测试效果
http://localhost:8080/signPolicyToSyncNotify3.6. 开源项目
https://gitee.com/gblfy/design-pattern/tree/observer-mode/



