栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

kafka实现应用之间消息队列实战(六)

kafka实现应用之间消息队列实战(六)

零、聊天室计划取消,应用中有一个场景应用到了kafka消息队列,在主服务中会有用户注册的操作,后续需要给用户推送相关的消息发送短信。于是分为两个服务,第一个为用户服务负责注册用户,注册完后将用户保存到kafka队列中,第二个消息服务会从kafka队列拿用户信息,来进行后续的追踪。

一、代码

1、配置文件

#kafka地址
spring.kafka.longze.bootstrap-servers=10.20.30.40:9092
#kafka组
spring.kafka.longze.cust.group-id=test-kafkaGroup
#topic kafka话题
kafka.testTopic=test-topic
#是否开启自动提交
spring.kafka.longze.cust.enable-auto-commit=true

2、依赖

    compile group: 'org.springframework.kafka', name: 'spring-kafka', version: '2.5.6.RELEASE'

3、kafka工具类

package com.longze.fengqx.config;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;

import java.util.HashMap;
import java.util.Map;


@EnableKafka
@Configuration
public class KafkaLongzeConfig {

    @Value("${spring.kafka.longze.bootstrap-servers}")
    private String bootstrapServers;
    @Value("${spring.kafka.longze.cust.group-id}")
    private String groupId;
    @Value("${spring.kafka.longze.cust.enable-auto-commit}")
    private boolean enableAutoCommit;

    @Bean
    public KafkaTemplate kafkaLongzeTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    @Bean
    KafkaListenerContainerFactory> kafkaLongzeContainerFactory() {
        ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(3);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }

    private ProducerFactory producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    public ConsumerFactory consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    private Map producerConfigs() {
        Map props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.RETRIES_CONFIG, 0);
        props.put(ProducerConfig.ACKS_CONFIG, "1");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }

    private Map consumerConfigs() {
        Map props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;
    }


}

4、生产者写入kafka  --用户主服务 注册新用户

import com.alibaba.fastjson.JSON;
import com.xxl.job.core.context.XxlJobHelper;
import com.xxl.job.core.handler.annotation.XxlJob;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;


@Component
public class TestKafkaSchedule {
    public static final Logger logger = LoggerFactory.getLogger(TestKafkaSchedule.class);

    @Value("${kafka.testTopic}")
    private String kafkaTopic;

    @Autowired
    private KafkaTemplate kafkaLongzeTemplate;

    @XxlJob("testKafkaPush")
    @Transactional(rollbackFor = Exception.class)
    public void testKafkaPush() throws Exception {
        //推送待回溯数据-新注册用户
        try {
            XxlJobHelper.log("推送待回溯数据-新注册用户 start");
            //生产者发送消息
            ClientDTO clientDTO=new ClientDTO();
            clientDTO.setClientIDs(clientService.initbaseInfo());
            clientDTO.setStartDate("19700101");
            kafkaLongzeTemplate.send(kafkaTopic, JSON.toJSonString(clientDTO));
            XxlJobHelper.log("发送内容"+clientDTO.toString());
            System.out.println("发送结束");
            XxlJobHelper.log("推送待回溯数据-新注册用户 end");
        } catch (Exception exception) {
            throw exception;
        }
    }


}

5、消费者 - 消息微服务

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.xxl.job.core.context.XxlJobHelper;
import org.apache.ibatis.session.SqlSession;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;

import java.text.SimpleDateFormat;
import java.util.*;



@Component
public class KafkaClientConsumer {
    private static final Logger log= LoggerFactory.getLogger(KafkaClientConsumer.class);


    @KafkaListener(topics={"TestTopic"},containerFactory = "kafkaLongzeContainerFactory")
    @Transactional(rollbackFor = Exception.class)
    public void listenKakfaConsume(ConsumerRecordrecord) throws Exception {
        try {
            log.info("kafka -TEST 新注册用户kafka接收源数据{}",record.value());
            ObjectMapper objectMapper = new ObjectMapper();
            ClientDTO clientDTO = objectMapper.readValue(record.value().toString(), ClientDTO.class);
            log.info("kafka TEST 新注册用户 数据:{}", clientDTO.toString());
            String[] userIDs= clientDTO.getUserIDs();
            log.info("kafka TEST 新注册用户:{}", JSON.toJSonString(userIDs));
            }
        }catch (JsonProcessingException e){
            e.printStackTrace();
            log.error("kafka - TEST 新注册用户数据解析异常"+e.getMessage());
            throw e;
        }
    }  
}

二、在kafka正常情况下,就简单实现了kafka队列功能

 上篇文章:从零开始 kafka集群部署,拒绝挖坑,每一步都经过本人调试成功

 kafka简介与集群部署安装(一)kafka简介与集群部署安装(一)_无敌小田田的博客-CSDN博客零、坐标火星,leader让研究一下kafka+websocket做一套即时通讯工具出来,需求紧急,调研了一番。一、Kafka简介1、消息队列(Message Queue)Message Queue消息传送系统提供传送服务。消息传送依赖于大量支持组件,这些组件负责处理连接服务、消息的路由和传送、持久性、安全性以及日志记录。消息服务器可以使用一个或多个代理实例。JMS(Java Messaging Service)是Java平台上有关面向消息中间件(MOM)的技术规范,它便于消息系统中的Javhttps://blog.csdn.net/qq_36602951/article/details/121175749kafka使用+集成Java(二)

kafka使用+集成Java(二)_无敌小田田的博客-CSDN博客零、kafka集成已经整合完毕,接下来要做的就是和java打通一、https://blog.csdn.net/qq_36602951/article/details/121317250kafka+websocket示例(三)

https://blog.csdn.net/qq_36602951/article/details/121325381https://blog.csdn.net/qq_36602951/article/details/121325381

完成前三步之后,后续就可以根据业务来定制不同的功能了,

接下来将扩展websocket功能

SpringBoot+websocket构造聊天室(四)

(18条消息) SpringBoot+websocket构造聊天室(四)_无敌小田田的博客-CSDN博客https://blog.csdn.net/qq_36602951/article/details/121436617第六步:kafka实现消息队列 简单应用

(18条消息) kafka实现应用之间消息队列实战(六)_无敌小田田的博客-CSDN博客https://blog.csdn.net/qq_36602951/article/details/123151570

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/748127.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号