前言
由于我们的新项目使用的是spring-boot,而又要同步新项目中建的数据到老的系统当中.原来已经有一部分的同步代码,使用的是kafka. 其实只是做数据的同步,我觉得选MQ没必要使用kafka.首先数据量不大,其实搞kafka又要搞集群,ZK.只是用做一些简单数据同步的话,有点大材小用.
没办法,咱只是个打工的,领导让搞就搞吧.刚开始的时候发现有一个spring-integration-kafka,描述中说是基于spring-kafka做了一次重写.但是我看了官方文档.实在是搞的有点头大.功能一直没实现.文档写的也不是很漂亮,也可能是刚起步,有很多的问题.我这里只能放弃了,使用了spring-kafka.
实现方法
pom.xml文件如下
4.0.0 org.linuxsogood.sync linuxsogood-sync1.0.0-SNAPSHOT org.springframework.boot spring-boot-starter-parent1.4.0.RELEASE 1.8 3.3.1 1.2.4 3.3.6 4.1.1 org.springframework.boot spring-boot-starter-weborg.springframework.boot spring-boot-starter-jdbcorg.springframework.boot spring-boot-starter-aoporg.springframework.boot spring-boot-starter-freemarkerorg.springframework.kafka spring-kafka1.1.0.RELEASE junit junit4.12 test org.assertj assertj-core3.5.2 org.hamcrest hamcrest-all1.3 test org.mockito mockito-all1.9.5 test org.springframework spring-test4.2.3.RELEASE test org.springframework.boot spring-boot-starter-testtest mysql mysql-connector-javacom.microsoft.sqlserver sqljdbc44.0.0 com.alibaba druid1.0.11 org.mybatis mybatis${mybatis.version} org.mybatis mybatis-spring${mybatis.spring.version} org.mybatis.generator mybatis-generator-core1.3.2 compile true com.github.pagehelper pagehelper${pagehelper.version} tk.mybatis mapper${mapper.version} com.alibaba fastjson1.2.17 repo.spring.io.milestone Spring framework Maven Milestone Repository https://repo.spring.io/libs-milestone mybatis_generator org.mybatis.generator mybatis-generator-maven-plugin1.3.2 true true org.springframework.boot spring-boot-maven-pluginorg.linuxsogood.sync.Starter
orm层使用了MyBatis,又使用了通用Mapper和分页插件.
kafka消费端配置
import org.linuxsogood.sync.listener.Listener;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import java.util.HashMap;
import java.util.Map;
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
@Value("${kafka.broker.address}")
private String brokerAddress;
@Bean
KafkaListenerContainerFactory> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
factory.getContainerProperties().setPollTimeout(3000);
return factory;
}
@Bean
public ConsumerFactory consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map consumerConfigs() {
Map propsMap = new HashMap<>();
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddress);
propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, "firehome-group");
propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return propsMap;
}
@Bean
public Listener listener() {
return new Listener();
}
}
生产者的配置.
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import java.util.HashMap;
import java.util.Map;
@Configuration
@EnableKafka
public class KafkaProducerConfig {
@Value("${kafka.broker.address}")
private String brokerAddress;
@Bean
public ProducerFactory producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public Map producerConfigs() {
Map props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddress);
props.put(ProducerConfig.RETRIES_CONFIG, 0);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
@Bean
public KafkaTemplate kafkaTemplate() {
return new KafkaTemplate(producerFactory());
}
}
监听,监听里面,写的就是业务逻辑了,从kafka里面得到数据后,具体怎么去处理. 如果需要开启kafka处理消息的广播模式,多个监听要监听不同的group,即方法上的注解@KafkaListener里的group一定要不一样.如果多个监听里的group写的一样,就会造成只有一个监听能处理其中的消息,另外监听就不能处理消息了.也即是kafka的分布式消息处理方式.
在同一个group里的监听,共同处理接收到的消息,会根据一定的算法来处理.如果不在一个组,但是监听的是同一个topic的话,就会形成广播模式
import com.alibaba.fastjson.JSON;
import org.linuxsogood.qilian.enums.CupMessageType;
import org.linuxsogood.qilian.kafka.MessageWrapper;
import org.linuxsogood.qilian.model.store.Store;
import org.linuxsogood.sync.mapper.StoreMapper;
import org.linuxsogood.sync.model.StoreExample;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import java.util.List;
import java.util.Optional;
public class Listener {
private static final Logger LOGGER = LoggerFactory.getLogger(Listener.class);
@Autowired
private StoreMapper storeMapper;
@KafkaListener(topics = "linuxsogood-topic", group = "sync-group")
public void listen(ConsumerRecord, ?> record) {
Optional> kafkaMessage = Optional.ofNullable(record.value());
if (kafkaMessage.isPresent()) {
Object message = kafkaMessage.get();
try {
MessageWrapper messageWrapper = JSON.parseObject(message.toString(), MessageWrapper.class);
CupMessageType type = messageWrapper.getType();
//判断消息的数据类型,不同的数据入不同的表
if (CupMessageType.STORE == type) {
proceedStore(messageWrapper);
}
} catch (Exception e) {
LOGGER.error("将接收到的消息保存到数据库时异常, 消息:{}, 异常:{}",message.toString(),e);
}
}
}
private void proceedStore(MessageWrapper messageWrapper) {
Object data = messageWrapper.getData();
Store cupStore = JSON.parseObject(data.toString(), Store.class);
StoreExample storeExample = new StoreExample();
String storeName = StringUtils.isBlank(cupStore.getStoreOldName()) ? cupStore.getStoreName() : cupStore.getStoreOldName();
storeExample.createCriteria().andStoreNameEqualTo(storeName);
List stores = storeMapper.selectByExample(storeExample);
org.linuxsogood.sync.model.Store convertStore = new org.linuxsogood.sync.model.Store();
org.linuxsogood.sync.model.Store store = convertStore.convert(cupStore);
//如果查询不到记录则新增
if (stores.size() == 0) {
storeMapper.insert(store);
} else {
store.setStoreId(stores.get(0).getStoreId());
storeMapper.updateByPrimaryKey(store);
}
}
}
总结
以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作能带来一定的帮助,如果有疑问大家可以留言交流,谢谢大家对考高分网的支持。



