springboot 项目导入依赖
org.springframework.kafka
spring-kafka
在application.yml 添加配置
spring:
kafka:
bootstrap-servers: 192.168.2.104:9092,192.168.2.104:9093,192.168.2.104:9094
producer:
# 消息重发的次数
#retries: 0
#一个批次内存大小
batch-size: 16384
# 设置生产者内存缓冲区的大小。
buffer-memory: 33554432
# 键的序列化方式
key-serializer: org.apache.kafka.common.serialization.StringSerializer
# 值的序列化方式
value-serializer: org.apache.kafka.common.serialization.StringSerializer
#配置事务必须为 all 或者 -1
acks: all
#事务id
transaction-id-prefix: demo-tran-
consumer:
# 自动提交的时间间隔 需要符合特定的格式,如1S,1M,2H,5D
auto-commit-interval: 1S
# 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
auto-offset-reset: earliest
# 是否自动提交偏移量
enable-auto-commit: false
# 键的反序列化方式
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# 值的反序列化方式
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
listener:
#手工ack,调用ack后立刻提交offset
ack-mode: manual_immediate
#容器运行的线程数
concurrency: 6
#避免出现主题未创建报错
missing-topics-fatal: false
发送消息
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaOperations;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@Slf4j
@RestController
@RequestMapping("/api/v1")
public class UserController {
private static final String TOPIC_NAME = "user.demo.topic";
@Autowired
private KafkaTemplate kafkaTemplate;
@GetMapping("/register/{num}")
public void sendMessage(@PathVariable("num") String num){
kafkaTemplate.send(TOPIC_NAME,String.format("消息内容:%s",num)).addCallback(succsess->{
String topic = succsess.getRecordMetadata().topic();
long offset = succsess.getRecordMetadata().offset();
int partition = succsess.getRecordMetadata().partition();
log.info("发送成功:topic->{},partition->{} , offset->{}",topic,partition,offset);
},failure->{
log.info("发送失败:{}",failure.getMessage());
});
}
@GetMapping("/register/tran1")
@Transactional(rollbackFor = RuntimeException.class)
public void sendMessage2(int num){
kafkaTemplate.send(TOPIC_NAME, "这个是事务里面的消息:1 i="+num);
if (num == 0) {
throw new RuntimeException("fail");
}
kafkaTemplate.send(TOPIC_NAME, "这个是事务里面的消息:2 i="+num);
}
@GetMapping("/register/tran2")
public void sendMessage3(int num){
kafkaTemplate.executeInTransaction(new KafkaOperations.OperationsCallback() {
@Override
public Object doInOperations(KafkaOperations kafkaOperations) {
kafkaOperations.send(TOPIC_NAME,"这个是事务里面的消息:1 i="+num);
if(num==0)
{
throw new RuntimeException("input is error");
}
kafkaOperations.send(TOPIC_NAME,"这个是事务里面的消息:2 i="+num);
return true;
}
});
}
}
配置监听
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class MQListener {
@KafkaListener(topics = {"user.demo.topic"},groupId = "demo-g1")
public void onMessage(ConsumerRecord, ?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic){
try {
log.info("主题:topic:{},partition:{}",record.topic(),record.partition());
log.info("消费消息内容:{}",record.value());
} finally {
ack.acknowledge();
}
}
}