栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

SpringBoot+Kafka+策略模式

SpringBoot+Kafka+策略模式

1.pom引入依赖

        
            org.springframework.kafka
            spring-kafka
        

2.yml文件增加配置

spring:
   kafka:
      # bootstrap-servers: 192.168.68.11:19092,192.168.68.13:19092,192.168.68.16:19092
      bootstrap-servers: localhost:9092
      producer:
         # 重试次数
         retries: 0
         # 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)
         acks: 1
         # 批量大小
         batch-size: 16384
         # 生产端缓冲区大小
         buffer-memory: 33554432
         key-serializer: org.apache.kafka.common.serialization.StringSerializer
         value-serializer: org.apache.kafka.common.serialization.StringSerializer
         properties:
            max.request.size: 5242880
      consumer:
         # 默认消费者组
         group-id: panoramic-operation-api
         # group-id: defaultConsumerGroup
         # 最早未被消费的offset
         auto-offset-reset: earliest
         enable-auto-commit: true
         # 批量一次最大拉取数据量
         # max-poll-records: 1000
         # 自动确认offset的时间间隔
         auto-commit-interval: 1000
         # key的序列化类
         key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
         # value的序列化类
         value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # 消费监听接口监听的主题不存在时
      listener:
         missing-topics-fatal: false
# kafka监听topic
kafka:
   topics: test

3.编写发送消息接口,KafkaProducerService.java

public interface KafkaProducerService {

    void send(String topic,Object object);
}

4.编写发送消息接口实现类,KafkaProducerServiceImpl.java

@Service
@Slf4j
public class KafkaProducerServiceImpl implements KafkaProducerService {

    @Autowired
    private KafkaTemplate kafkaTemplate;

    @Override
    public void send(String topic, Object object) {
        if (StringUtils.isBlank(topic)){
            log.info("topic is null");
            return;
        }
        kafkaTemplate.send(topic, JSON.toJSONString(object, SerializerFeature.WriteNullStringAsEmpty)).addCallback(new ListenableFutureCallback>() {
            @Override
            public void onFailure(Throwable ex) {
                log.info("发送消息失败:{}",ex.getMessage());
            }

            @Override
            public void onSuccess(SendResult result) {
                log.info("发送消息成功:{}-{}-{}",result.getRecordmetadata().topic(), result.getRecordmetadata().partition() , result.getRecordmetadata().offset());
            }
        });
    }
}

5.通过策略模式编写监听处理,这样做不需要写过多的if else
5.1创建消息处理接口,MessageHandler.java

public interface MessageHandler {

    void handle(String message);
}

5.2创建消息处理上下文,通过Spring将实现了MessageHandler 的类加载到容器中,HandlerContext.java

@Component
public class HandlerContext {

    @Autowired
    public final Map map = new ConcurrentHashMap<>();

    public HandlerContext(Map map) {
        this.map.clear();
        map.forEach(this.map::put);
    }

    public MessageHandler getHandler(String handler){
        MessageHandler messageHandler = map.get(handler);
        if (messageHandler == null) {
            throw  new RuntimeException();
        }
        return messageHandler;
    }
}

5.3 编写消息监听总入口ConsumerListener.java

@Slf4j
@Component
public class ConsumerListener {


    @Autowired
    private HandlerContext handlerContext;

    @KafkaListener(topics = "#{'${kafka.topics}'.split(',')}",groupId = "${spring.kafka.consumer.group-id}")
    public void listen(ConsumerRecord record) {
        log.info("监听kafka消息,topic={},partition={},offset={}",record.topic(),record.partition(),record.offset());
        String topic = record.topic();
        try {
            MessageHandler handler = handlerContext.getHandler(topic);
            String message = String.valueOf(record.value());
            handler.handle(message);
        } catch (Exception e) {
            log.error("消息策略不存在");
        }


    }
}

5.4编写具体的消息处理业务类,TestHandle.java

@Component("test")
@Slf4j
public class TestHandle implements MessageHandler{
    @Override
    public void handle(String message) {
        System.err.println("接收到数据啦===="+message);
    }
}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/327070.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号