零、聊天室计划取消,应用中有一个场景应用到了kafka消息队列,在主服务中会有用户注册的操作,后续需要给用户推送相关的消息发送短信。于是分为两个服务,第一个为用户服务负责注册用户,注册完后将用户保存到kafka队列中,第二个消息服务会从kafka队列拿用户信息,来进行后续的追踪。
一、代码
1、配置文件
#kafka地址 spring.kafka.longze.bootstrap-servers=10.20.30.40:9092 #kafka组 spring.kafka.longze.cust.group-id=test-kafkaGroup #topic kafka话题 kafka.testTopic=test-topic #是否开启自动提交 spring.kafka.longze.cust.enable-auto-commit=true
2、依赖
compile group: 'org.springframework.kafka', name: 'spring-kafka', version: '2.5.6.RELEASE'
3、kafka工具类
package com.longze.fengqx.config;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import java.util.HashMap;
import java.util.Map;
@EnableKafka
@Configuration
public class KafkaLongzeConfig {
@Value("${spring.kafka.longze.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.longze.cust.group-id}")
private String groupId;
@Value("${spring.kafka.longze.cust.enable-auto-commit}")
private boolean enableAutoCommit;
@Bean
public KafkaTemplate kafkaLongzeTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
KafkaListenerContainerFactory> kafkaLongzeContainerFactory() {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
factory.getContainerProperties().setPollTimeout(3000);
return factory;
}
private ProducerFactory producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
public ConsumerFactory consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
private Map producerConfigs() {
Map props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.RETRIES_CONFIG, 0);
props.put(ProducerConfig.ACKS_CONFIG, "1");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
private Map consumerConfigs() {
Map props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}
}
4、生产者写入kafka --用户主服务 注册新用户
import com.alibaba.fastjson.JSON;
import com.xxl.job.core.context.XxlJobHelper;
import com.xxl.job.core.handler.annotation.XxlJob;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
@Component
public class TestKafkaSchedule {
public static final Logger logger = LoggerFactory.getLogger(TestKafkaSchedule.class);
@Value("${kafka.testTopic}")
private String kafkaTopic;
@Autowired
private KafkaTemplate kafkaLongzeTemplate;
@XxlJob("testKafkaPush")
@Transactional(rollbackFor = Exception.class)
public void testKafkaPush() throws Exception {
//推送待回溯数据-新注册用户
try {
XxlJobHelper.log("推送待回溯数据-新注册用户 start");
//生产者发送消息
ClientDTO clientDTO=new ClientDTO();
clientDTO.setClientIDs(clientService.initbaseInfo());
clientDTO.setStartDate("19700101");
kafkaLongzeTemplate.send(kafkaTopic, JSON.toJSonString(clientDTO));
XxlJobHelper.log("发送内容"+clientDTO.toString());
System.out.println("发送结束");
XxlJobHelper.log("推送待回溯数据-新注册用户 end");
} catch (Exception exception) {
throw exception;
}
}
}
5、消费者 - 消息微服务
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.xxl.job.core.context.XxlJobHelper;
import org.apache.ibatis.session.SqlSession;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import java.text.SimpleDateFormat;
import java.util.*;
@Component
public class KafkaClientConsumer {
private static final Logger log= LoggerFactory.getLogger(KafkaClientConsumer.class);
@KafkaListener(topics={"TestTopic"},containerFactory = "kafkaLongzeContainerFactory")
@Transactional(rollbackFor = Exception.class)
public void listenKakfaConsume(ConsumerRecord, ?>record) throws Exception {
try {
log.info("kafka -TEST 新注册用户kafka接收源数据{}",record.value());
ObjectMapper objectMapper = new ObjectMapper();
ClientDTO clientDTO = objectMapper.readValue(record.value().toString(), ClientDTO.class);
log.info("kafka TEST 新注册用户 数据:{}", clientDTO.toString());
String[] userIDs= clientDTO.getUserIDs();
log.info("kafka TEST 新注册用户:{}", JSON.toJSonString(userIDs));
}
}catch (JsonProcessingException e){
e.printStackTrace();
log.error("kafka - TEST 新注册用户数据解析异常"+e.getMessage());
throw e;
}
}
}
二、在kafka正常情况下,就简单实现了kafka队列功能
上篇文章:从零开始 kafka集群部署,拒绝挖坑,每一步都经过本人调试成功
kafka简介与集群部署安装(一)kafka简介与集群部署安装(一)_无敌小田田的博客-CSDN博客零、坐标火星,leader让研究一下kafka+websocket做一套即时通讯工具出来,需求紧急,调研了一番。一、Kafka简介1、消息队列(Message Queue)Message Queue消息传送系统提供传送服务。消息传送依赖于大量支持组件,这些组件负责处理连接服务、消息的路由和传送、持久性、安全性以及日志记录。消息服务器可以使用一个或多个代理实例。JMS(Java Messaging Service)是Java平台上有关面向消息中间件(MOM)的技术规范,它便于消息系统中的Javhttps://blog.csdn.net/qq_36602951/article/details/121175749kafka使用+集成Java(二)
kafka使用+集成Java(二)_无敌小田田的博客-CSDN博客零、kafka集成已经整合完毕,接下来要做的就是和java打通一、https://blog.csdn.net/qq_36602951/article/details/121317250kafka+websocket示例(三)
https://blog.csdn.net/qq_36602951/article/details/121325381https://blog.csdn.net/qq_36602951/article/details/121325381
完成前三步之后,后续就可以根据业务来定制不同的功能了,
接下来将扩展websocket功能
SpringBoot+websocket构造聊天室(四)
(18条消息) SpringBoot+websocket构造聊天室(四)_无敌小田田的博客-CSDN博客https://blog.csdn.net/qq_36602951/article/details/121436617第六步:kafka实现消息队列 简单应用
(18条消息) kafka实现应用之间消息队列实战(六)_无敌小田田的博客-CSDN博客https://blog.csdn.net/qq_36602951/article/details/123151570



