这篇文章主要介绍了Spring纯Java配置集成kafka代码实例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
KafkaConfig.java
package com.niugang.config;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.OffsetAndmetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
import com.niugang.controller.SenderConttoller;
@Configuration
@EnableKafka
public class KafkaConfig {
private Logger logger = LoggerFactory.getLogger(KafkaConfig.class);
@Bean
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();
// 偏移量提交方式
// factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.COUNT);
// 异步提交偏移量(默认就是true)
// factory.getContainerProperties().setSyncCommits(true);
//回调函数经常用于记录提交错误
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
public ConsumerFactory consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerProps());
}
@Bean
public ProducerFactory producerFactory() {
return new DefaultKafkaProducerFactory<>(senderProps());
}
@Bean
public KafkaTemplate kafkaTemplate() {
return new KafkaTemplate(producerFactory());
}
@Bean
public ConsumerListener listener() {
return new ConsumerListener();
}
private Map consumerProps() {
Map props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "kafka_group_1");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}
private Map senderProps() {
Map props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.RETRIES_CONFIG, 0);
props.put(ProducerConfig.ACKS_CONFIG, "1");
// 以下配置当缓存数量达到16kb,就会触发网络请求,发送消息
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
// 每条消息在缓存中的最长时间,如果超过这个时间就会忽略batch.size的限制,由客户端立即将消息发送出去
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
// key的序列化方式
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// value序列化方式
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
}
ConsumerListener.java
package com.niugang.config;
import org.springframework.kafka.annotation.KafkaListener;
public class ConsumerListener {
@KafkaListener(id = "foo", topics = "annotated1")
public void listen1(String foo) {
System.out.println("接收消息为:"+foo);
}
}
源码:https://gitee.com/niugangxy/kafka/tree/master/kafka-spring-boot
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持考高分网。



