栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

springboot kafka发送消息

springboot kafka发送消息

  • 场景:kafka发送消息,并且根据消息发送到不同的渠道类型(例如发送到WX,DingDing,邮箱),采取不同的线程池处理

    1.引入依赖

    		
    			org.springframework.kafka
    			spring-kafka
    			2.7.8
    		
    
    		
    			com.alibaba
    			fastjson
    			1.2.78
    		
    

    2.kafka配置信息

    sobev.kafka.ip=xxx.x.x.x
    sobev.kafka.port=9092
    sobev.business.topic.name=sobevBusiness
    
    spring.kafka.bootstrap-servers=${sobev.kafka.ip}:${sobev.kafka.port}
    spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
    spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
    spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
    spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
    spring.kafka.consumer.auto.offset.reset=earliest
    spring.kafka.consumer.auto-commit-interval=1000
    spring.kafka.consumer.enable-auto-commit=true
    

    3.发送消息工具类

    @Component
    @Slf4j
    public class KafkaUtils {
        @Autowired
         private KafkaTemplate kafkaTemplate;
        
        public void send(String topicName, String jsonMessage) {
            kafkaTemplate.send(topicName, jsonMessage);
        }
    }
    

    4.注册kafka消息监听器接收消息
    由于存在多种消息发送渠道和消息类型,因此需要多个监听器监听不同渠道的不同类型,但是不着急,不用急着把所有类型都写一个Listener

    //不同的渠道不同的消息类型都设为单独的类型,由不同consumer消费
    public class GroupIdMappingUtils {
        
        public static List getAllGroupIds() {
            List groupIds = new ArrayList<>();
            for (ChannelType channelType : ChannelType.values()) {
                for (MessageType messageType : MessageType.values()) {
                    groupIds.add(channelType.getCodeEn() + "." + messageType.getCodeEn());
                }
            }
            return groupIds;
        }
    }
    

    定义一个kafka消息监听器

    @Component
    //prototype类型 意味着每次请求类都将返回一个新的类
    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
    @Slf4j
    public class KafkaReceiver {
    
        @Autowired
        private TaskPendingHolder taskPendingHolder;
    
        @KafkaListener(topics = "#{'${sobev.business.topic.name}'}")
        public void consumer(ConsumerRecord consumerRecord, @Header(KafkaHeaders.GROUP_ID) String topicGroupId){
            Optional kafkaMessage = Optional.of(consumerRecord.value());
            if(kafkaMessage.isPresent()){
                String s = kafkaMessage.get();
                TaskInfo taskInfo = JSON.parseObject(s, TaskInfo.class);
                if((taskInfo.getSendChannel() + "." + taskInfo.getMsgType()).equals(topicGroupId)){
                    System.out.println(topicGroupId + " :" + s);
                    //根据当前 topicGroupId 路由到不同的线程池处理
                  taskPendingHolder.route(topicGroupId).execute(具体任务...)
                }
            }
        }
    }
    

    根据prototype的特性,注册多个监听器

    Service
    public class KafkaReceiverStarter {
    
        @Autowired
        ApplicationContext applicationContext;
        
        @Autowired
        private TaskPendingHolder taskPendingHolder;
    
        private static List groupIds = GroupIdMappingUtils.getAllGroupIds();
    
        private static Integer idx = 0;
    
        
        @PostConstruct
        public void init(){
            for (int i = 0; i < groupIds.size(); i++) {
                applicationContext.getBean(KafkaReceiver.class);
            }
        }
        
        @Bean
        public static KafkaListenerAnnotationBeanPostProcessor.AnnotationEnhancer groupIdEnhancer(){
            return (attrs, element) -> {
            //设置@KafkaListener内的groupId属性
                attrs.put("groupId", groupIds.get(idx++));
                return attrs;
            };
        }
    }
    

    定义线程池持有容器

    @Component
    public class TaskPendingHolder {
        private Map taskPendingHolder = new HashMap<>(32);
    
        
        private static List groupIds = GroupIdMappingUtils.getAllGroupIds();
    
        
        @PostConstruct
        public void init() {
            for (String groupId : groupIds) {
            //自定义线程池生成器 根据groupId生成不同的线程池
                MyExecutor executor = MyThreadPoolConfig.getExecutor(groupId);
                
                taskPendingHolder.put(groupId, executor);
            }
        }
        
        public ExecutorService route(String groupId) {
            return taskPendingHolder.get(groupId);
        }
    
    
    }
    
    

    未完。。。

    参考于github austin 消息推送平台

  • 转载请注明:文章转载自 www.mshxw.com
    本文地址:https://www.mshxw.com/it/780034.html
    我们一直用心在做
    关于我们 文章归档 网站地图 联系我们

    版权所有 (c)2021-2022 MSHXW.COM

    ICP备案号:晋ICP备2021003244-6号