- Springboot整合Kafka
- 结构
- 业务层
- 生产消息
- 消费消息
- 订单业务
- 控制层
- 测试
需求:传输订单ID,并将ID放入消息队列(生产者),最后取出消息,完成发送短信业务(消费者)。
使用spring-boot-web-starter,添加Web模块。
首先,创造一个topic:order:
kafka-topics.sh --bootstrap-server hadoop101:9092 --create --partitions 1 --replication-factor 2 --topic first结构 业务层
MessageService.java
package com.jd.springboot_mq.service;
public interface MessageService {
// 发送消息
void sendMessage(String id);
}
OrderService.java
package com.jd.springboot_mq.service;
public interface OrderService {
// 生成订单
void order(String id);
}
生产消息
MessageServiceKafkaImpl.java
package com.jd.springboot_mq.service.impl.kafka;
import com.jd.springboot_mq.service.MessageService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
@Service
public class MessageServiceKafkaImpl implements MessageService {
//生产消息
@Resource
private KafkaTemplate kafkaTemplate;
@Override
public void sendMessage(String id) {
kafkaTemplate.send("order",id);
System.out.println("待发送短信的订单已纳入处理队列(Kafka),id:"+id);
}
}
消费消息
MessageListener.java
package com.jd.springboot_mq.service.impl.kafka.listener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class MessageListener {
// 消费消息
@KafkaListener(topics = "order")
public void onMessage(ConsumerRecord record) {
System.out.println("已完成短信发送业务(Kafka),id:"+record.value());
}
}
订单业务
OrderServiceImpl.java
package com.jd.springboot_mq.service.impl;
import com.jd.springboot_mq.service.MessageService;
import com.jd.springboot_mq.service.OrderService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;
@Service
public class OrderServiceImpl implements OrderService {
@Qualifier("messageServiceKafkaImpl")
@Autowired
private MessageService messageService;
@Override
public void order(String id) {
//订单处理之前
System.out.println("订单处理开始......");
//短信的处理
messageService.sendMessage(id);
//后续订单处理
System.out.println("订单处理结束......");
System.out.println();
}
}
控制层
OrderContoller.java
package com.jd.springboot_mq.contoller;
import com.jd.springboot_mq.service.OrderService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/orders")
public class OrderController {
@Autowired
private OrderService orderService;
@PostMapping("{id}")
public void order(@PathVariable String id) {
orderService.order(id);
}
}
测试
使用Postman发送请求:
http://localhost/orders/id3
结果:



