栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

仿淘宝开放平台之消息服务——消息中心推送消息至消费者

仿淘宝开放平台之消息服务——消息中心推送消息至消费者

消息中心收到生产者推送过来的业务消息外,一方面,需要给生产者推送一条消息确认的响应消息;另一方面,则需要根据消息主题,查找所有订阅该主题的消费者(实际就是消息客户端),将消息复制及转发出去。

这种情况下,实际消息中心在消息的生产者和消费者中间,充当了经纪人的角色,生产者和消费者只需要跟消息中心交互就行了,并不知道对方的存在,也就是实现生产者和消费者的解耦。

下面我们来实现从消息中心推送业务消息至消费者的过程。

以下方法是消息中心处理请求消息的过程,前面几个步骤记录日志、验证框架、设置消息状态、该处理器的个性化处理和发送响应,前面都提到过,而本节的主角是repostMessage方法。

    
    public void handleMessage(RequestMessage requestMessage, Channel channel) {

        // 记录消息请求日志
        apiMessageLogService.createRequestPart(requestMessage);

        //验证框架
        validateframework(requestMessage);

        //将请求消息状态默认设置为无需发送
        apiMessageLogService.updateStatus(MessageStatus.NOT_TO_REQUEST.name(),requestMessage.getId());


        //特殊处理
        messageOperation(requestMessage, channel);

        //发送响应
        sendResponse(channel, MessageResponseResultEnum.SUCCESS.name(), "", requestMessage.getId(),
                requestMessage.getTopic());
        //消息处理(复制及转发)
        repostMessage(requestMessage);
    }

repostMessage方法内容如下:

    
    protected void repostMessage(RequestMessage requestMessage) {
        //查找订阅
        String topic = requestMessage.getTopic();
        List subscriberList = apiMessageSubscriptionService.getSubscriberList(topic);
        if (CollectionUtils.isEmpty(subscriberList)) {
            return;
        }
        //创建请求消息
        RequestMessage message = new RequestMessage();
        message.setTopic(topic);
        message.setContent(requestMessage.getContent());
        	message.setPublishAppCode(appConfig.getApiPlatformMessage().getMessageServerAppCode());
        //设置状态为等待发送
        message.setStatus(MessageStatus.WAIT_REQUEST.name());
        //遍历订阅者,发送消息
        subscriberList.stream().forEach(appCode -> {
            //通过原型模式复制消息,更改响应应用编码
            RequestMessage newMessage = message.clone();
            newMessage.setResponseAppCode(appCode);
            //数据权限过滤
            boolean hasDataPermission = dataPermissionFilter(requestMessage, appCode);
            if (hasDataPermission) {
                //发送消息
                sendMessage(newMessage);
            }

        });

    }

查找订阅主题的消息客户端以及构建转发消息逻辑比较简单,没什么好说的。

一个消息主题可能有多个消息客户端订阅,又分为两种情况,一是这些消息客户端能收到所有消息,即不需要控制数据权限,例如将所有的运输任务单创建消息发送给做在途跟踪的服务商;二是某个特定的消息客户端,只能收到其中一部分消息,需要进行数据权限过滤,例如委托单指定了承运商,我们应该将委托单创建消息,只发送给被委托的这家承运商。

基于数据权限的考虑,我们增加了dataPermissionFilter方法,可被子类覆写,实现数据权限的过滤与控制。

    
    protected boolean dataPermissionFilter(RequestMessage message,String appCode){
        //默认返回true,不进行数据权限控制,可被需要进行数据权限控制的子类覆写
        return true;
    }

委托单创建消息的数据权限控制示例

    @Override
    protected boolean dataPermissionFilter(RequestMessage message, String appCode) {
        //获取业务单据标识
        String id = message.getContent();
        //通过api调用,获取该业务单据的承运商编码,此处模拟为001
        String carrierCode="001";

        //查找当前应用拥有的承运商数据角色列表
        List list = apiDataPermissionService.getPermissionByRoleCode(DataRoleEnum.CARRIER.name(), appCode);

        AtomicBoolean hasPermission= new AtomicBoolean(false);
        list.stream().forEach(x->{
            //如数据权限记录的业务编码与单据编码一致,或者使用了通配符,则有权限
            if(x.getBusinessCode().equals(carrierCode) || x.getBusinessCode().equals(DATA_PERMISSION_ALL)){
                hasPermission.set(true);
                return;
            }

        });

        return hasPermission.get();
    }

另外一个需要注意的地方是发送消息的处理。因为我们规划设计,与消息中心对接的模式有两种,一种是基于消息客户端模式,通过建立websocket长连接,实时推送,另外一种是允许客户端以定时轮询的方式,通过Restful api来查询待处理的消息和进行消息确认,所以发送消息这个环节需要判断下对接模式,对于两种模式采取不同的处理逻辑,具体如下:

   
    protected void sendMessage(RequestMessage message) {
        String appCode = message.getResponseAppCode();

        //获取对接模式
        String integrationModel = apiAppService.getIntegrationModelByAppCode(appCode);
        if (integrationModel.equals(IntegrationModel.CLIENT.name())) {
            //客户端模式
            Channel channel = MessageServerHolder.appChannelMap.get(appCode);
            ChannelFuture channelFuture = channel.writeAndFlush(message);
            channelFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    //设置状态
                    message.setStatus(MessageStatus.REQUESTED.name());
                    message.setSendCount(message.getSendCount() + 1);

                    // 记录消息请求日志
                    apiMessageLogService.createRequestPart(message);
                }
            });
        } else if (integrationModel.equals(IntegrationModel.INTERFACE.name())) {
            //状态设置为待处理
            message.setStatus(MessageStatus.WAIT_HANDLE.name());

            // 记录消息请求日志
            apiMessageLogService.createRequestPart(message);
        }

    }


消费者收到服务中心推送过来的消息时,使用如下处理器进行处理

@Slf4j
public class ConsignmentBillCreateRequestHandler extends RequestMessageHandler {

    @Override
    protected void messageOperation(RequestMessage message, Channel channel) {
        log.info("收到委托单创建消息");
        // 进行业务处理

    }
}

其父类,会发送一条消息确认的响应消息给消息中心,以通知消息中心,业务消息已经收到。

而消息中心收到这条消息后,更新消息日志。

@Slf4j
public class Message/confirm/iResponseHandler extends ResponseMessageHandler {

}

以上就是整体处理流程,消息中心发业务消息给消费者,消费者进行业务处理,反馈一条消息确认的响应消息,消息中心收到后,更新消息日志的状态,从而完成闭环。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/710233.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号