功能需求1.导入依赖和配置
1.导入依赖2. 配置Kafka
修改consumer配置文件spring中配置服务连接端口与consumer 3. 访问Kafka进行测试
封装生产者发送消息
`KafkaTemplate``send(topic, data)` 封装消费者消费消息
`@KafkaListener(topics = {"test"})``ConsumerRecord`
测试发送与接收测试结果
参考牛客网高级项目教程
尚硅谷kafka教学笔记
功能需求使用SpringBoot的java代码操作kafka需要将Spring框架与Kafka整合 1.导入依赖和配置 1.导入依赖
2. 配置Kafka 修改consumer配置文件 spring中配置服务连接端口与consumerorg.springframework.kafka spring-kafka
#kafka相关配置 spring.kafka.bootstrap-servers=192.168.181.136:9092 #组id spring.kafka.consumer.group-id=community-consumer-group #获取offset后是否自动提交 spring.kafka.consumer.enable-auto-commit=true #自动提交的频率 spring.kafka.consumer.auto-commit-interval=30003. 访问Kafka进行测试 封装生产者发送消息 KafkaTemplate
Spring内置的处理kafka的模板引擎 send(topic, data)
向指定的topic主题中发送数据
@Component
class KafkaProducer {
@Autowired
private KafkaTemplate kafkaTemplate;
public void sendMessage(String topic, String content) {
kafkaTemplate.send(topic, content);
}
}
封装消费者消费消息
@KafkaListener(topics = {"test"})
监听指定的主题消息-可以传多个主题 ConsumerRecord
将监听到的消息封装成ConsumerRecord对象,方便处理本例中将对象的值打印到控制台进行测试
@Component
class kafkaConsumer {
@KafkaListener(topics = {"test"})
public void handleMessage(ConsumerRecord record) {
System.out.println(record.value());
}
}
测试发送与接收
发送消息是主动立即发送消费者接收消息是被动的,根据线程分配,可能会有点延迟
@RunWith(SpringRunner.class)
@SpringBootTest
@ContextConfiguration(classes = CommunityApplication.class)
public class KafkaTest {
@Autowired
private KafkaProducer kafkaProducer;
@Test
public void testKafka() {
kafkaProducer.sendMessage("test", "你好");
kafkaProducer.sendMessage("test", "在吗");
// 延迟一段时间,让消费者读取数据
try {
Thread.sleep(1000 * 10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
测试结果



