- Spring整合Kafka
- 一、引入依赖
- 二、配置kafka
- 三、测试代码--如何用kafka
- 3.1 KafkaTests
- 3.2 测试结果
二、配置kafkaorg.springframework.kafka spring-kafka
#KafkaProperties ##配端口号 spring.kafka.bootstrap-servers=localhost:9092 ##配消费者的组id spring.kafka.consumer.group-id=community-consumer-group ##是否自动提交 消费者的偏移量 spring.kafka.consumer.enable-auto-commit=true ##自动提交的频率,也就是多久提交一次——3000ms-3s spring.kafka.consumer.auto-commit-interval=3000
其中:
需进行改动(改不改无所谓)
→
先启动zookeeper和kafka
3.1 KafkaTests重点:生产者发消息,使我们主动去调的;消费者接收消息,是自动的,只需要在方法上加一个@KafkaListener(topics = {“xxxx”}),就能监听xxxx主体的生产者发来的消息,并且接收到!
@RunWith(SpringRunner.class)
@SpringBootTest
@ContextConfiguration(classes = CommunityApplication.class)
public class KafkaTests {
@Autowired
private kafkaProducer kafkaProducer;
@Test
public void testKafka(){
kafkaProducer.sendMessage("test","你好!");
kafkaProducer.sendMessage("test","你收到信息了吗?");
try {
Thread.sleep(1000*10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
//为了方便,直接在这写两个类,而不去去外面重新新建
@Component
class kafkaProducer{//生产者
@Autowired
private KafkaTemplate kafkaTemplate;
public void sendMessage(String topic,String context){
kafkaTemplate.send(topic,context);
}
}@Component
class kafkaConsumer{//消费者
@KafkaListener(topics = {"test"})
public void handleMessage(ConsumerRecord record){
System.out.println(record.value());
}
}
3.2 测试结果



