栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Springboot整合Kafka

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Springboot整合Kafka

Springboot整合Kafka

目录
  • Springboot整合Kafka
    • 结构
    • 业务层
      • 生产消息
      • 消费消息
      • 订单业务
    • 控制层
    • 测试

需求:传输订单ID,并将ID放入消息队列(生产者),最后取出消息,完成发送短信业务(消费者)。

使用spring-boot-web-starter,添加Web模块。

首先,创造一个topic:order:

kafka-topics.sh --bootstrap-server hadoop101:9092 --create --partitions 1 --replication-factor 2 --topic first
结构 业务层

MessageService.java

package com.jd.springboot_mq.service;

public interface MessageService {

    // 发送消息
    void sendMessage(String id);

}

OrderService.java

package com.jd.springboot_mq.service;

public interface OrderService {

    // 生成订单
    void order(String id);
}

生产消息

MessageServiceKafkaImpl.java

package com.jd.springboot_mq.service.impl.kafka;

import com.jd.springboot_mq.service.MessageService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;


@Service
public class MessageServiceKafkaImpl implements MessageService {

    //生产消息
    @Resource
    private KafkaTemplate kafkaTemplate;

    @Override
    public void sendMessage(String id) {
        kafkaTemplate.send("order",id);
        System.out.println("待发送短信的订单已纳入处理队列(Kafka),id:"+id);
    }

}

消费消息

MessageListener.java

package com.jd.springboot_mq.service.impl.kafka.listener;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class MessageListener {
    // 消费消息
    @KafkaListener(topics = "order")
    public void onMessage(ConsumerRecord record) {
        System.out.println("已完成短信发送业务(Kafka),id:"+record.value());
    }
}

订单业务

OrderServiceImpl.java

package com.jd.springboot_mq.service.impl;

import com.jd.springboot_mq.service.MessageService;
import com.jd.springboot_mq.service.OrderService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;

@Service
public class OrderServiceImpl implements OrderService {

    @Qualifier("messageServiceKafkaImpl")
    @Autowired
    private MessageService messageService;

    @Override
    public void order(String id) {
        //订单处理之前
        System.out.println("订单处理开始......");
        //短信的处理
        messageService.sendMessage(id);
        //后续订单处理
        System.out.println("订单处理结束......");
        System.out.println();
    }
}

控制层

OrderContoller.java

package com.jd.springboot_mq.contoller;

import com.jd.springboot_mq.service.OrderService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/orders")
public class OrderController {

    @Autowired
    private OrderService orderService;

    @PostMapping("{id}")
    public void order(@PathVariable String id) {
        orderService.order(id);
    }
}

测试

使用Postman发送请求:

http://localhost/orders/id3

结果:

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/1040653.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号