栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

一种可扩展的消息路由实现方式,kafka基本原理及leader

一种可扩展的消息路由实现方式,kafka基本原理及leader

@Builder

@Getter

@ToString

public class Message {

private String event;

private String msgType;

private String content;

private String fromUser;

private String toUser;

private Instant createTime;

//… as you wish

}

待消费消息对象包含一些Trait字段,包括event,msgType,content。这些字段既可以作为路由规则匹配的依据,也能作为具体消息处理的条件。可根据不同的业务类型去扩展。

消息处理结果

@Builder

@ToString

public class OutMessage {

private String msgType;

private String fromUser;

private String toUser;

private Instant createTime;

}

消息处理结果,根据具体业务自行定义吧。

3.2 消息处理器

抽象类型

public interface MessageHandler {

OutMessage handle(Message message, Map context);

}

这是所有消息处理器的抽象类型,自定义的处理器都必须实现它。简单实现一个消息日志记录处理器。

@Component

public class LogMessageHandler implements MessageHandler {

@Override

public OutMessage handle(Message message, Map context) {

System.out.println(message.toString());

// define your return value

return null;

}

}

3.2 路由相关

消息拦截器

public interface MessageInterceptor {

OutMessage handle(Message message, Map context);

}

拦截器可增强对消息处理。自行实现此接口。

消息匹配器

public interface MessageRouterMatcher {

boolean match(Message message);

}

匹配器可实现对消息的过滤,以实现对消息的规则匹配。

路由器

public class MessageRouter {

@Getter

private final List rules = new ArrayList<>();

public MessageRouterRule rule(){

return new MessageRouterRule(this);

}

private OutMessage route(Message message,Map context){

final List matchRules = new ArrayList<>();

final Iterator iterator = this.rules.iterator();

while (iterator.hasNext()){

final MessageRouterRule rule = iterator.next();

if (rule.test(message)){

matchRules.add(rule);

}

}

if(matchRules.size() == 0){

return null;

}else{

final Iterator matchIterator = matchRules.iterator();

while (matchIterator.hasNext()){

final MessageRouterRule rule = matchIterator.next();

//think think multi OutMessage

return rule.service(message, context);

}

}

return null;

}

public OutMessage route(Message message){

return this.route(message,new HashMap<>(2));

}

}

消息路由规则

public class MessageRouterRule {

//是否异步处理消息

private boolean async;

private String event;

private String msgType;

private String content;

private String fromUser;

private String toUser;

private MessageRouter router;

private MessageRouterMatcher matcher;

private List handlers = new ArrayList<>();

private List interceptors = new ArrayList

《一线大厂Java面试题解析+后端开发学习笔记+最新架构讲解视频+实战项目源码讲义》

【docs.qq.com/doc/DSmxTbFJ1cmN1R2dB】 完整内容开源分享

<>();

public MessageRouterRule async(boolean async){

this.async = async;

return this;

}

public MessageRouterRule msgType(String msgType){

this.msgType = msgType;

return this;

}

public MessageRouterRule event(String event){

this.event = event;

return this;

}

public MessageRouterRule content(String content){

this.content= content;

return this;

}

public MessageRouterRule fromUser(String fromUser){

this.fromUser= fromUser;

return this;

}

public MessageRouterRule toUser(String toUser){

this.toUser= toUser;

return this;

}

public MessageRouterRule handler(MessageHandler handler,MessageHandler… otherHandlers){

this.handlers.add(handler);

if(otherHandlers != null && otherHandlers.length>0){

Collections.addAll(this.handlers,otherHandlers);

}

return this;

}

public MessageRouterRule handle(MessageHandler handler){

return this.handler(handler,(MessageHandler[]) null);

}

public MessageRouter end(){

this.router.getRules().add(this);

return this.router;

}

protected boolean test(Message message){

//here can use matcher

return (this.fromUser == null || this.fromUser.equals(message.getFromUser())) && (this.msgType == null || this.msgType.toLowerCase().equals(message.getMsgType() == null ? null : message.getMsgType().toLowerCase())) && (this.event == null || this.event.toLowerCase().equals(message.getEvent() == null ? null : message.getEvent().toLowerCase())) && (this.content == null || this.content.equals(message.getContent() == null ? null : message.getContent().trim())) && (this.matcher == null || this.matcher.match(message));

}

public MessageRouterRule(MessageRouter router){

this.router = router;

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/651675.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号