1.pom引入依赖
org.springframework.kafka spring-kafka
2.yml文件增加配置
spring:
kafka:
# bootstrap-servers: 192.168.68.11:19092,192.168.68.13:19092,192.168.68.16:19092
bootstrap-servers: localhost:9092
producer:
# 重试次数
retries: 0
# 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)
acks: 1
# 批量大小
batch-size: 16384
# 生产端缓冲区大小
buffer-memory: 33554432
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
properties:
max.request.size: 5242880
consumer:
# 默认消费者组
group-id: panoramic-operation-api
# group-id: defaultConsumerGroup
# 最早未被消费的offset
auto-offset-reset: earliest
enable-auto-commit: true
# 批量一次最大拉取数据量
# max-poll-records: 1000
# 自动确认offset的时间间隔
auto-commit-interval: 1000
# key的序列化类
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# value的序列化类
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# 消费监听接口监听的主题不存在时
listener:
missing-topics-fatal: false
# kafka监听topic
kafka:
topics: test
3.编写发送消息接口,KafkaProducerService.java
public interface KafkaProducerService {
void send(String topic,Object object);
}
4.编写发送消息接口实现类,KafkaProducerServiceImpl.java
@Service
@Slf4j
public class KafkaProducerServiceImpl implements KafkaProducerService {
@Autowired
private KafkaTemplate kafkaTemplate;
@Override
public void send(String topic, Object object) {
if (StringUtils.isBlank(topic)){
log.info("topic is null");
return;
}
kafkaTemplate.send(topic, JSON.toJSONString(object, SerializerFeature.WriteNullStringAsEmpty)).addCallback(new ListenableFutureCallback>() {
@Override
public void onFailure(Throwable ex) {
log.info("发送消息失败:{}",ex.getMessage());
}
@Override
public void onSuccess(SendResult result) {
log.info("发送消息成功:{}-{}-{}",result.getRecordmetadata().topic(), result.getRecordmetadata().partition() , result.getRecordmetadata().offset());
}
});
}
}
5.通过策略模式编写监听处理,这样做不需要写过多的if else
5.1创建消息处理接口,MessageHandler.java
public interface MessageHandler {
void handle(String message);
}
5.2创建消息处理上下文,通过Spring将实现了MessageHandler 的类加载到容器中,HandlerContext.java
@Component
public class HandlerContext {
@Autowired
public final Map map = new ConcurrentHashMap<>();
public HandlerContext(Map map) {
this.map.clear();
map.forEach(this.map::put);
}
public MessageHandler getHandler(String handler){
MessageHandler messageHandler = map.get(handler);
if (messageHandler == null) {
throw new RuntimeException();
}
return messageHandler;
}
}
5.3 编写消息监听总入口ConsumerListener.java
@Slf4j
@Component
public class ConsumerListener {
@Autowired
private HandlerContext handlerContext;
@KafkaListener(topics = "#{'${kafka.topics}'.split(',')}",groupId = "${spring.kafka.consumer.group-id}")
public void listen(ConsumerRecord, ?> record) {
log.info("监听kafka消息,topic={},partition={},offset={}",record.topic(),record.partition(),record.offset());
String topic = record.topic();
try {
MessageHandler handler = handlerContext.getHandler(topic);
String message = String.valueOf(record.value());
handler.handle(message);
} catch (Exception e) {
log.error("消息策略不存在");
}
}
}
5.4编写具体的消息处理业务类,TestHandle.java
@Component("test")
@Slf4j
public class TestHandle implements MessageHandler{
@Override
public void handle(String message) {
System.err.println("接收到数据啦===="+message);
}
}



