对于消息客户端,需要将请求消息推送到服务端,服务端对该消息进行应答。
例如:
系统场景:客户端需要推送一条登录请求消息至服务端,而服务端推送一条登录响应消息至客户端。
业务场景:客户端需要推送一条委托单创建消息至服务端,而服务端推送一条确认消息至客户端。
与收到消息的处理器类似,有大量的逻辑代码是相同或相似的,这里,我们实现一个消息发送器的抽象基类,将公用代码抽取出来复用,留置部分可被子类覆写的方法,用于实现具体的业务逻辑即可。
发送消息有两种情况,一是消息新创建的时候,这时候消息标识直接使用请求消息构造方法中生成的标识即可,二是消息重发,这时候,我们需要读取原有的消息标识,即重发过程中保持消息不变,这样服务端才能根据消息标识去重。
发送消息的操作不算复杂,无非是构造消息,发送消息,保存日志。
这里面有个问题需要思考,发送器与消息的关系是什么?
发送器相当于业务消息的封装,发送的肯定是一条请求消息,消息的参数是通过发送器属性设置,还是调用发送方法时作为方法参数处理?
实际上,这两种方式都能实现我们的目的,具体分析如下:
对于发送器对应的消息主题,我们希望具体的发送器直接内置固化,不能交由外部传入,否则会造成混乱,因此使用构造参数的方法来设置。
对于消息标识,有两种场景,一是消息新创建的时候,这时候消息标识直接使用请求消息构造方法中生成的标识即可,二是消息重发,这时候,我们需要读取原有的消息标识,即重发过程中保持消息不变,这样服务端才能根据消息标识去重。因此消息标识这个参数可以在调用方法时传入。
对于消息内容,同样有两种情况,一种是对应业务事件,包括标识、事件,另外一种,则对应部分技术框架部分的操作,如登录,需要将账号和密钥构建一个json对象传入。
综上考虑,我们设置两个属性,消息主题和消息内容,其中消息主题为私有属性,仅允许通过构造函数传入,消息内容则设置了一个保护方法,允许被子类调用。
同时,为方便灵活调用,我们设置了多个重载方法来发送消息,使用方可以通过方法参数,传入事件的标识和描述,内部实现消息体的构建,最大程度的进行复用。
发送器基类实现
@Slf4j
public abstract class baseSender {
private String topic;
private String content;
protected void setContent(String content) {
this.content = content;
}
public baseSender(String topic){
this.topic=topic;
}
protected MessageClientGlobalHolder config= SpringUtil.getBean(MessageClientGlobalHolder.class);
protected ApiMessageLogService apiMessageLogService= SpringUtil.getBean(ApiMessageLogService.class);
public void sendMessage(){
sendMessage(null);
}
public void sendMessage(String id){
sendMessage(content,id);
}
public void sendMessage(String eventId,String event, String id) {
baseEvent baseEvent=new baseEvent();
baseEvent.setId(eventId);
baseEvent.setEvent(event);
String content= JSON.toJSONString(event);
sendMessage(content,id);
}
public void sendMessage(String content, String id)
{
// 生成请求消息
RequestMessage message = new RequestMessage();
// 使用已有ID重置默认生成的ID,用于关联消息
if(StringUtils.isNotBlank(id)){
message.setId(id);
}
message.setTopic(topic);
message.setContent(content);
message.setPublishAppCode(config.getAppCode());
//设置状态为等待发送
message.setStatus(MessageStatus.WAIT_REQUEST.name());
Channel channel= config.getChannel();
if(channel!=null && channel.isActive()){
ChannelFuture channelFuture = channel.writeAndFlush(message);
channelFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
//设置状态
message.setStatus(MessageStatus.REQUESTED.name());
message.setSendCount(message.getSendCount() + 1);
//保存日志
saveLog(message);
}
});
}else{
//保存日志
saveLog(message);
}
}
protected void saveLog(RequestMessage message) {
ApiMessageLog log = new ApiMessageLog();
log.setRequestId(message.getId());
log.setRequestAppCode(message.getPublishAppCode());
log.setRequestTopicCode(message.getTopic());
log.setRequestTime(LocalDateTime.now());
log.setRequestData(message.getContent());
log.setResponseAppCode(message.getResponseAppCode());
log.setStatus(message.getStatus());
log.setSendCount(message.getSendCount());
apiMessageLogService.add(log);
}
}
登录请求发送器
public class LoginRequestSender extends baseSender {
public LoginRequestSender() {
super("framework.login.request");
//使用账号密钥构建消息内容
JSONObject jsonObject = new JSONObject();
jsonObject.put("appCode", config.getAppCode());
jsonObject.put("appSecret", config.getAppSecret());
super.setContent(jsonObject.toJSONString());
}
}
委托单创建发送器
public class ConsignmentBillCreateSender extends baseSender {
public ConsignmentBillCreateSender(){
super("lms.transportbill.consignmentbill.create");
}
}
进一步思考,实际我们的消息主题是细颗粒度的,对于一张单据的创建、修改、作废,是对应三个主题而不是一个。如果将单据设计为1个消息主题,内部在区分具体的事件,则存在诸多问题,一是难以进行细粒度的权限控制,二是会在内部产生诸多的条件分支,并在在新增事件时需要进行调整而不是扩展。
同样,基于消息主题的细颗粒度,实际我们可以在消息内容里直接放一个业务单据的标识而不是是一个事件对象,这样反而更简便,现有的baseSender类不需要调整,直接调用除 void sendMessage(String eventId,String event, String id)方法以外的其他三个重载方法,均可以实现目的。



